Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ bug_fixes:
Fixed a crashing bug in the HTTP filter when a stream was already above the downstream write-buffer
high watermark at filter-chain construction time. Downstream watermark callback registration is
now deferred until the in-module filter has been constructed.
- area: io_uring
change: |
Bounded the number of injected io_uring completions processed per dispatcher tick to keep a
steady stream of injected completions (or completion callbacks that inject more completions)
from starving other work on the dispatcher thread. Any completions left over after the cap is
reached stay queued and are processed on subsequent ticks via an eventfd self-poke.

removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
31 changes: 25 additions & 6 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ bool isIoUringSupported() {
return is_supported;
}

IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling)
: cqes_(io_uring_size, nullptr) {
IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling,
uint32_t max_injected_completions_per_event)
: cqes_(io_uring_size, nullptr),
max_injected_completions_per_event_(max_injected_completions_per_event) {
RELEASE_ASSERT(max_injected_completions_per_event_ > 0,
"max_injected_completions_per_event must be > 0");
struct io_uring_params p {};
if (use_submission_queue_polling) {
p.flags |= IORING_SETUP_SQPOLL;
Expand Down Expand Up @@ -75,10 +79,12 @@ void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) {
io_uring_cq_advance(&ring_, count);

ENVOY_LOG(trace, "the num of injected completion is {}", injected_completions_.size());
// TODO(soulxu): Add bound here to avoid too many completion to stuck the thread too
// long.
// Iterate the injected completion.
while (!injected_completions_.empty()) {
// Drain at most `max_injected_completions_per_event_` injected completions per event-loop tick
// so a steady stream of injections (or completion callbacks that inject more completions) can
// never starve other work on the dispatcher thread. Any completions left over are processed on
// the next tick after the eventfd is re-armed below.
uint32_t processed = 0;
while (!injected_completions_.empty() && processed < max_injected_completions_per_event_) {
auto& completion = injected_completions_.front();
completion_cb(completion.user_data_, completion.result_, true);
// The socket may closed in the completion_cb and all the related completions are
Expand All @@ -87,6 +93,19 @@ void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) {
break;
}
injected_completions_.pop_front();
++processed;
}

// If we hit the cap with work still queued, write to the eventfd so this callback fires again
// on the next dispatcher tick. The eventfd is non-blocking and we drain it at the top of this
// method, so this is just a self-poke.
if (!injected_completions_.empty()) {
ENVOY_LOG(trace, "injected completion cap reached, {} remaining; re-arming eventfd",
injected_completions_.size());
const eventfd_t v = 1;
int ret = eventfd_write(event_fd_, v);
RELEASE_ASSERT(ret == 0,
fmt::format("failed to re-arm io_uring eventfd: {}", errorDetails(errno)));
}
}

Expand Down
12 changes: 11 additions & 1 deletion source/common/io/io_uring_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@ class IoUringImpl : public IoUring,
public ThreadLocal::ThreadLocalObject,
protected Logger::Loggable<Logger::Id::io> {
public:
IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling);
// Cap on the number of injected completions processed per event-loop tick.
// Without this cap, a steady stream of injected completions whose callbacks inject more
// completions can stall the dispatcher thread indefinitely. When the cap is reached, the
// remaining injected completions stay queued and the eventfd is re-armed so processing
// resumes on the next tick.
static constexpr uint32_t DefaultMaxInjectedCompletionsPerEvent = 1024;

IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling,
uint32_t max_injected_completions_per_event =
DefaultMaxInjectedCompletionsPerEvent);
~IoUringImpl() override;

os_fd_t registerEventfd() override;
Expand All @@ -52,6 +61,7 @@ class IoUringImpl : public IoUring,
std::vector<struct io_uring_cqe*> cqes_;
os_fd_t event_fd_{INVALID_SOCKET};
std::list<InjectedCompletion> injected_completions_;
const uint32_t max_injected_completions_per_event_;
};

} // namespace Io
Expand Down
49 changes: 49 additions & 0 deletions test/common/io/io_uring_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,55 @@ TEST_F(IoUringImplTest, NestRemoveInjectCompletion) {
waitForCondition(*dispatcher, [&completions_nr]() { return completions_nr == 2; });
}

TEST_F(IoUringImplTest, BoundedInjectedCompletionsPerEvent) {
// Recreate io_uring_ with a small per-tick cap so we can observe the bound.
io_uring_ = std::make_unique<IoUringImpl>(2, false, /*max_injected_completions_per_event=*/3);

auto dispatcher = api_->allocateDispatcher("test_thread");

os_fd_t event_fd = io_uring_->registerEventfd();
const Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;

int32_t total_completions = 0;
uint32_t event_callback_invocations = 0;
std::vector<int32_t> per_invocation_counts;

auto file_event = dispatcher->createFileEvent(
event_fd,
[this, &total_completions, &event_callback_invocations,
&per_invocation_counts](uint32_t) {
int32_t before = total_completions;
io_uring_->forEveryCompletion(
[&total_completions](Request*, int32_t, bool injected) {
EXPECT_TRUE(injected);
total_completions++;
});
per_invocation_counts.push_back(total_completions - before);
event_callback_invocations++;
return absl::OkStatus();
},
trigger, Event::FileReadyType::Read);

// Inject 7 completions: with cap=3 we expect 3 + 3 + 1 across three event ticks.
std::array<int, 7> data{1, 2, 3, 4, 5, 6, 7};
std::vector<std::unique_ptr<TestRequest>> requests;
for (int& d : data) {
requests.push_back(std::make_unique<TestRequest>(d));
io_uring_->injectCompletion(/*fd=*/42, requests.back().get(), -1);
}

file_event->activate(Event::FileReadyType::Read);
waitForCondition(*dispatcher, [&total_completions]() { return total_completions == 7; });

// Each tick after the first must have been driven by the eventfd self-poke since we never
// called activate() again — proving the re-arm path works.
EXPECT_EQ(7, total_completions);
EXPECT_GE(event_callback_invocations, 3u);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this EXPECT_EQ? I thought for a moment it might be because other unrelated events could increment it, but it's only incremented by this specific event, so it seems like it would always be exactly 3.

If there is a reason, please add a comment explaining it; if there is not, please make it EXPECT_EQ.

(Also for consistency either total_completions should be uint32_t or event_callback_invocations should be int32_t, they're both just counting up from zero so it's weird that they're different types.)

EXPECT_EQ(3, per_invocation_counts[0]);
EXPECT_EQ(3, per_invocation_counts[1]);
EXPECT_EQ(1, per_invocation_counts[2]);
}

TEST_F(IoUringImplTest, RegisterEventfd) {
EXPECT_FALSE(io_uring_->isEventfdRegistered());
io_uring_->registerEventfd();
Expand Down