Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ep/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ Notes:
| UCCL_IB_TC | Traffic class in RDMA network | 104/0 (IB/EFA) |
| UCCL_EP_ENABLE_AGGRESSIVE_ATOMIC | Use relaxed atomics with manual `s_waitcnt vmcnt(0)` fences instead of acquire/release semantics. Required on AMD CDNA so the combine receiver actually sees the producer's tail-pointer updates over XGMI; without it the kernel deadlocks at scale. | 1 on AMD, 0 on CUDA |
| UCCL_RDMA_ADAPTIVE_SLEEP | Enable adaptive sleeping on proxy threads, by putting the proxy threads into a sleeping state if there have been no new work requests / RDMA completion events after 120s. | null |
| UCCL_UDP_SPORT_BASE | Base UDP source port for RoCEv2 QPs on mlx5 devices. Valid range: 1-65535. | 0 (driver decides) |

## Results

Expand Down
1 change: 1 addition & 0 deletions ep/include/proxy_ctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct ProxyCtx {
// std::vector<ibv_cq*> extra_cqs;
ibv_qp* ack_qp = nullptr;
ibv_qp* recv_ack_qp = nullptr;
int udp_sport_base = 0;
// EFA shared-QP model: when true, the QP and ack_recv_buf/mr fields above
// alias another ProxyCtx (typically Proxy::ctx_) and must not be destroyed.
bool qps_are_shared = false;
Expand Down
45 changes: 44 additions & 1 deletion ep/include/rdma_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
#include <mutex>
#include <unordered_set>
#include <vector>
#include <dlfcn.h>
#include <fcntl.h>
#include <limits.h>
#include <sys/socket.h>
#include <unistd.h>

void fill_local_gid(ProxyCtx& S, RDMAConnectionInfo* local_info) {
static inline void fill_local_gid(ProxyCtx& S, RDMAConnectionInfo* local_info) {
if (!S.context) {
fprintf(stderr, "Error: context not initialized when filling GID\n");
exit(1);
Expand Down Expand Up @@ -103,6 +104,48 @@ static inline int get_roce_version_from_env() {
return roce_version;
}

static inline int get_udp_sport_base_from_env() {
static int udp_sport_base = 0;
char const* env = getenv("UCCL_UDP_SPORT_BASE");
if (env) {
int val = atoi(env);
if (val < 1 || val > 65535) {
fprintf(stderr,
"UCCL_UDP_SPORT_BASE=%d out of the valid range [1, 65535], "
"ignoring\n",
val);
} else {
udp_sport_base = val;
}
}
return udp_sport_base;
}

typedef int (*mlx5dv_modify_qp_udp_sport_fn)(struct ibv_qp*, uint16_t);

static inline mlx5dv_modify_qp_udp_sport_fn load_mlx5dv_udp_sport_fn() {
static mlx5dv_modify_qp_udp_sport_fn fn =
[]() -> mlx5dv_modify_qp_udp_sport_fn {
void* handle = dlopen("libmlx5.so", RTLD_NOW);
if (!handle) handle = dlopen("libmlx5.so.1", RTLD_NOW);
if (!handle) {
fprintf(stderr, "Failed to open libmlx5.so[.1]: %s\n", dlerror());
return nullptr;
}
auto f = (mlx5dv_modify_qp_udp_sport_fn)dlsym(handle,
"mlx5dv_modify_qp_udp_sport");
if (!f) {
fprintf(stderr,
"dlsym failed on mlx5dv_modify_qp_udp_sport: %s, "
"skipping UDP sport configuration\n",
dlerror());
dlclose(handle);
}
return f;
}();
return fn;
}

static sa_family_t envIbAddrFamily(void) {
sa_family_t family = AF_INET;
char const* env = getenv("UCCL_IB_ADDR_FAMILY");
Expand Down
2 changes: 2 additions & 0 deletions ep/src/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "d2h_queue_host.hpp"
#include "ep_util.hpp"
#include "rdma.hpp"
#include "rdma_util.hpp"
#include "util/util.h"
#include <arpa/inet.h> // for htonl, ntohl
#include <chrono>
Expand Down Expand Up @@ -162,6 +163,7 @@ void Proxy::set_peers_meta(std::vector<PeerMeta> const& peers) {
ctxs_for_all_ranks_.resize(peers.size());
for (size_t i = 0; i < peers.size(); ++i) {
ctxs_for_all_ranks_[i] = std::make_unique<ProxyCtx>();
ctxs_for_all_ranks_[i]->udp_sport_base = get_udp_sport_base_from_env();
}
}

Expand Down
21 changes: 20 additions & 1 deletion ep/src/rdma.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#ifdef USE_DMABUF
#include <condition_variable>
#include <map>
#include <dlfcn.h>
#endif
#include <arpa/inet.h>
#include <netinet/in.h>
Expand Down Expand Up @@ -1338,6 +1337,26 @@ void modify_qp_to_rts(ProxyCtx& S, RDMAConnectionInfo* local_info) {
}

printf("ACK-QP modified to RTS state\n");

if (S.udp_sport_base != 0) {
auto fn = load_mlx5dv_udp_sport_fn();
if (fn) {
auto set_sport = [&](ibv_qp* qp, int sport, char const* name) {
if (fn(qp, (uint16_t)sport))
fprintf(stderr, "Warning: failed to set UDP sport %d on %s QP\n",
sport, name);
};
set_sport(S.qp, S.udp_sport_base, "main");
for (size_t r = 0; r < S.data_qps_by_channel.size(); ++r)
set_sport(S.data_qps_by_channel[r], S.udp_sport_base + r + 1, "data");
set_sport(S.ack_qp, S.udp_sport_base + S.data_qps_by_channel.size() + 1,
"ACK");
set_sport(S.recv_ack_qp,
S.udp_sport_base + S.data_qps_by_channel.size() + 2,
"recv ACK");
printf("UDP source ports set starting from %d\n", S.udp_sport_base);
}
}
}

void post_receive_buffer_for_imm_on_qp(ProxyCtx& S, ibv_qp* qp) {
Expand Down