Skip to content

Commit a8a251d

Browse files
authored
[coro_io][ibverbs] fix async_event_watcher (#1144)
1 parent 087d8c6 commit a8a251d

File tree

4 files changed

+90
-37
lines changed

4 files changed

+90
-37
lines changed

include/ylt/coro_io/ibverbs/ib_device.hpp

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <system_error>
2626
#include <unordered_map>
2727

28+
#include "asio/dispatch.hpp"
2829
#include "asio/ip/address.hpp"
2930
#include "asio/posix/stream_descriptor.hpp"
3031
#include "ib_buffer.hpp"
@@ -334,6 +335,19 @@ class ib_device_t : public std::enable_shared_from_this<ib_device_t> {
334335
return buffer_pool_;
335336
}
336337

338+
struct async_event_watcher_manager_t
339+
: public std::unordered_map<ib_device_t*, std::weak_ptr<ib_device_t>> {
340+
~async_event_watcher_manager_t() {
341+
for (auto& [_, dev] : *this) {
342+
if (auto ptr = dev.lock(); ptr) {
343+
ptr->stop_async_event_watcher();
344+
}
345+
}
346+
}
347+
};
348+
349+
~ib_device_t() { stop_async_event_watcher(); }
350+
337351
private:
338352
int ipv6_addr_v4mapped(const struct in6_addr* a) {
339353
return ((a->s6_addr32[0] | a->s6_addr32[1]) |
@@ -362,6 +376,12 @@ class ib_device_t : public std::enable_shared_from_this<ib_device_t> {
362376
return default_gid_index;
363377
}
364378

379+
template <typename T>
380+
static bool weak_ptrs_equal(const std::weak_ptr<T>& a,
381+
const std::weak_ptr<T>& b) {
382+
return !a.owner_before(b) && !b.owner_before(a);
383+
}
384+
365385
void start_async_event_watcher() {
366386
if (!ctx_) {
367387
return;
@@ -373,32 +393,35 @@ class ib_device_t : public std::enable_shared_from_this<ib_device_t> {
373393
<< "Error, failed to change file descriptor of async event queue\n";
374394
return;
375395
}
376-
377-
auto fd_deleter = [](asio::posix::stream_descriptor* fd) {
378-
fd->release();
379-
delete fd;
380-
};
381-
auto fd =
382-
std::unique_ptr<asio::posix::stream_descriptor, decltype(fd_deleter)>(
383-
new asio::posix::stream_descriptor(
384-
coro_io::get_global_executor()->get_asio_executor(),
385-
ctx_->async_fd));
396+
auto executor = coro_io::get_global_executor();
397+
async_event_watcher_fd_ = std::unique_ptr<asio::posix::stream_descriptor>(
398+
new asio::posix::stream_descriptor(executor->get_asio_executor(),
399+
ctx_->async_fd));
386400
auto listen_event = [](std::weak_ptr<ib_device_t> dev,
387-
auto fd) -> async_simple::coro::Lazy<void> {
401+
coro_io::ExecutorWrapper<>* executor)
402+
-> async_simple::coro::Lazy<void> {
388403
std::error_code ec;
389-
auto name = std::string{dev.lock()->name()};
404+
auto self = dev.lock();
405+
if (self == nullptr) {
406+
co_return;
407+
}
408+
auto self_raw_ptr = self.get();
409+
(*executor->get_data_with_default<async_event_watcher_manager_t>(
410+
"ylt_ib_watch"))[self_raw_ptr] = dev;
411+
auto name = std::string{self->name()};
390412
ELOG_INFO << "start_async_event_watcher of device:" << name << " start";
391413
while (!ec) {
392414
coro_io::callback_awaitor<std::error_code> awaitor;
393-
ec = co_await awaitor.await_resume([&fd](auto handler) {
394-
fd->async_wait(asio::posix::stream_descriptor::wait_read,
395-
[handler](const std::error_code& ec) mutable {
396-
handler.set_value_then_resume(ec);
397-
});
415+
ec = co_await awaitor.await_resume([&self](auto handler) {
416+
self->async_event_watcher_fd_->async_wait(
417+
asio::posix::stream_descriptor::wait_read,
418+
[handler](const std::error_code& ec) mutable {
419+
handler.set_value_then_resume(ec);
420+
});
421+
self = nullptr;
398422
});
399-
400423
if (!ec) {
401-
auto self = dev.lock();
424+
self = dev.lock();
402425
if (!self) {
403426
ELOG_DEBUG
404427
<< "ib_device async event stop listening by close device:"
@@ -412,14 +435,28 @@ class ib_device_t : public std::enable_shared_from_this<ib_device_t> {
412435
<< ec.message() << ",device:" << name;
413436
}
414437
}
438+
auto* table = executor->get_data<async_event_watcher_manager_t>(
439+
"ib_device_async_event_watcher");
440+
if (table) {
441+
auto iter = table->find(self_raw_ptr);
442+
if (iter != table->end() && weak_ptrs_equal(iter->second, dev)) {
443+
table->erase(iter);
444+
}
445+
}
415446
};
416-
listen_event(shared_from_this(), std::move(fd)).start([](auto&& ec) {
417-
});
447+
listen_event(shared_from_this(), executor).via(executor).detach();
448+
}
449+
450+
void stop_async_event_watcher() {
451+
std::error_code ec;
452+
[[maybe_unused]] auto _ = async_event_watcher_fd_->cancel(ec);
453+
async_event_watcher_fd_->release();
418454
}
419455

420456
std::string name_;
421457
std::unique_ptr<ibv_context, ib_deleter> ctx_;
422458
std::unique_ptr<ibv_pd, ib_deleter> pd_;
459+
std::unique_ptr<asio::posix::stream_descriptor> async_event_watcher_fd_;
423460
std::shared_ptr<ib_buffer_pool_t> buffer_pool_;
424461
std::atomic<bool> support_inline_data_ = true;
425462
ibv_port_attr attr_;

include/ylt/coro_io/io_context_pool.hpp

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <iostream>
2929
#include <memory>
3030
#include <mutex>
31+
#include <span>
3132
#include <thread>
3233
#include <type_traits>
3334
#include <utility>
@@ -249,8 +250,8 @@ class io_context_pool {
249250
promise_.set_value();
250251
}
251252

252-
void stop() {
253-
std::call_once(flag_, [this] {
253+
void stop(bool force = false) {
254+
std::call_once(flag_, [this, force] {
254255
bool has_run_or_stop = false;
255256
bool ok = has_run_or_stop_.compare_exchange_strong(has_run_or_stop, true);
256257

@@ -262,15 +263,16 @@ class io_context_pool {
262263

263264
work_.clear();
264265

265-
if (ok) {
266-
// clear all unfinished work
267-
for (auto &e : io_contexts_) {
268-
e->run();
266+
for (auto &e : io_contexts_) {
267+
if (ok) {
268+
e->poll(); // clear all unfinished work
269+
}
270+
if (force) {
271+
e->stop();
269272
}
270-
return;
271273
}
272-
273-
promise_.get_future().wait();
274+
if (!ok)
275+
promise_.get_future().wait();
274276
});
275277
}
276278

@@ -281,6 +283,10 @@ class io_context_pool {
281283

282284
std::size_t pool_size() const noexcept { return io_contexts_.size(); }
283285

286+
std::span<std::unique_ptr<coro_io::ExecutorWrapper<>>> get_all_executor() {
287+
return executors;
288+
}
289+
284290
bool has_stop() const { return work_.empty(); }
285291

286292
size_t current_io_context() { return next_io_context_ - 1; }

include/ylt/coro_rpc/impl/context.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ class context_base {
4444
typename rpc_protocol::req_header &get_req_head() { return self_->req_head_; }
4545

4646
bool check_status() {
47-
auto old_flag = self_->status_.exchange(context_status::start_response);
48-
if (old_flag != context_status::init)
47+
auto expected = context_status::init;
48+
if (!self_->status_.compare_exchange_strong(expected,
49+
context_status::start_response))
4950
AS_UNLIKELY {
5051
ELOG_ERROR << "response message more than one time";
5152
return false;
5253
}
53-
5454
if (self_->has_closed())
5555
AS_UNLIKELY {
5656
ELOG_DEBUG << "response_msg failed: connection has been closed";

src/coro_io/tests/ibverbs/ib_socket_pressure_test.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
struct config_t {
3737
std::size_t buffer_size = 256 * 1024;
3838
std::size_t request_size = 20 * 1024 * 1024 + 1;
39+
std::size_t recv_buffer_cnt = 8;
40+
std::size_t send_buffer_cnt = 4;
3941
int concurrency = 50;
4042
int test_type = 0;
4143
int enable_log = 0;
@@ -44,8 +46,9 @@ struct config_t {
4446
int port = 58110;
4547
int test_time = 10;
4648
};
47-
YLT_REFL(config_t, buffer_size, request_size, concurrency, test_type,
48-
enable_log, enable_server, enable_client, port, test_time);
49+
YLT_REFL(config_t, buffer_size, request_size, recv_buffer_cnt, send_buffer_cnt,
50+
concurrency, test_type, enable_log, enable_server, enable_client, port,
51+
test_time);
4952

5053
config_t config;
5154
std::shared_ptr<coro_io::ib_device_t> g_dev;
@@ -193,7 +196,11 @@ async_simple::coro::Lazy<std::error_code> echo_accept() {
193196

194197
ELOG_INFO << "tcp listening";
195198
while (true) {
196-
coro_io::ib_socket_t soc(coro_io::get_global_executor(), {.device = g_dev});
199+
coro_io::ib_socket_t soc(
200+
coro_io::get_global_executor(),
201+
{.recv_buffer_cnt = (uint16_t)config.recv_buffer_cnt,
202+
.send_buffer_cnt = (uint16_t)config.send_buffer_cnt,
203+
.device = g_dev});
197204
auto ec = co_await coro_io::async_accept(acceptor, soc);
198205

199206
if (ec) [[unlikely]] {
@@ -309,7 +316,10 @@ async_simple::coro::Lazy<std::error_code> echo_client_read_some(
309316
}
310317

311318
async_simple::coro::Lazy<std::error_code> echo_connect() {
312-
coro_io::ib_socket_t soc(coro_io::get_global_executor(), {.device = g_dev});
319+
coro_io::ib_socket_t soc(coro_io::get_global_executor(),
320+
{.recv_buffer_cnt = (uint16_t)config.recv_buffer_cnt,
321+
.send_buffer_cnt = (uint16_t)config.send_buffer_cnt,
322+
.device = g_dev});
313323
auto ec = co_await coro_io::async_connect(soc, config.enable_client,
314324
std::to_string(config.port));
315325
if (ec) [[unlikely]] {

0 commit comments

Comments
 (0)