Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
BasedOnStyle: LLVM
IndentWidth: 4
ColumnLimit: 100
AllowShortIfStatementsOnASingleLine: false
FixNamespaceComments: true
SortIncludes: true
IncludeBlocks: Preserve
25 changes: 25 additions & 0 deletions .clang-tidy
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Checks: >
bugprone-*,
performance-*,
readability-*,
portability-*,
modernize-*,
clang-analyzer-*,
-readability-identifier-length,
-modernize-macro-to-enum,
-modernize-deprecated-headers,
-performance-no-int-to-ptr,
-readability-isolate-declaration,
-readability-magic-numbers,
-readability-function-cognitive-complexity,
-bugprone-easily-swappable-parameters,
-bugprone-multi-level-implicit-pointer-conversion,
-modernize-use-using,
-modernize-use-trailing-return-type,
-bugprone-assignment-in-if-condition,
-clang-analyzer-security.insecureAPI.*,
-bugprone-implicit-widening-of-multiplication-result,
-readability-non-const-parameter,
-bugprone-narrowing-conversions
HeaderFilterRegex: '.*'
FormatStyle: file
46 changes: 46 additions & 0 deletions .github/workflows/codestyle.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Code-style

on:
pull_request:
branches: [ main ]

env:
CLANG_VER: 17
GIT_CF: https://raw.githubusercontent.com/llvm/llvm-project/release/11.x/clang/tools/clang-format/git-clang-format

jobs:
style:
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Install clang tools
run: |
sudo apt-get update
sudo apt-get install -y --no-install-recommends wget gpg
# Setup LLVM repository
sudo mkdir -p /etc/apt/keyrings
wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo gpg --dearmor -o /etc/apt/keyrings/llvm.gpg
echo "deb [signed-by=/etc/apt/keyrings/llvm.gpg] http://apt.llvm.org/jammy/ llvm-toolchain-jammy-${CLANG_VER} main" | sudo tee /etc/apt/sources.list.d/llvm.list
sudo apt-get update
sudo apt-get install -y --no-install-recommends clang-tidy-${CLANG_VER} bear clang-${CLANG_VER} clang++-${CLANG_VER}
curl -OL $GIT_CF && chmod +x ./git-clang-format && sudo mv ./git-clang-format /usr/bin/git-clang-format

- name: Check code format
run: |
set -eE
echo "Commit ${{ github.event.pull_request.base.sha }}"
diff=`git clang-format --binary=clang-format-${CLANG_VER} --style=file --diff ${{ github.event.pull_request.base.sha }}`
if [ "$diff" = "no modified files to format" ] || [ "$diff" = "clang-format did not modify any files" ]
then
echo "Format check PASS"
else
echo "Please check code format:"
echo "$diff"
fi

- name: Run clang-tidy
run: |
bear -- make -j$(nproc) libmpi.so
run-clang-tidy-${CLANG_VER} -p . -quiet
41 changes: 21 additions & 20 deletions backends/self/self_backend.cpp
Original file line number Diff line number Diff line change
@@ -1,55 +1,56 @@
#include <cstddef>
#include <cstring>
#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstring>
#include <vector>

using namespace std;

extern "C" {
#include "mpi.h"
#include "util.h"
#include "mpi.h"
#include "util.h"
}

class Send {
public:
public:
const void *userbuf;
void *libbuf;
int count;
MPI_Datatype datatype;
int tag; // TODO: handle tag, just change map to <pair<tag, comm>, vector<Send*>>

Send(const void *userbuf, int count, MPI_Datatype datatype, int tag, MPI_Comm comm) : userbuf(userbuf), count(count), datatype(datatype), tag(tag) {
size_t msg_size = count * nanompi_get_dtype_size(datatype);
Send(const void *userbuf, int count, MPI_Datatype datatype, int tag, MPI_Comm comm)
: userbuf(userbuf), count(count), datatype(datatype), tag(tag) {
auto msg_size = count * nanompi_get_dtype_size(datatype);
libbuf = malloc(msg_size);
memcpy(libbuf, userbuf, msg_size);
}

~Send() {
free(libbuf);
}
~Send() { free(libbuf); }
};

unordered_map<MPI_Comm, vector<Send*>> map;
unordered_map<MPI_Comm, vector<Send *>> map;

extern "C" int nanompi_self_send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)
{
extern "C" int nanompi_self_send(const void *buf, int count, MPI_Datatype datatype, int dest,
int tag, MPI_Comm comm) {
if (dest != comm->my_rank) {
cout << "error: rank using self send but not to itself!" << endl;
cout << "error: rank using self send but not to itself!\n";
}

map[comm].push_back(new Send(buf, count, datatype, tag, comm));

return MPI_SUCCESS;
}

extern "C" int nanompi_self_recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm)
{
vector<Send*> &sendq = map[comm];
vector<Send*>::iterator iter;
extern "C" int nanompi_self_recv(void *buf, int count, MPI_Datatype datatype, int source, int tag,
MPI_Comm comm) {
vector<Send *> &sendq = map[comm];
vector<Send *>::iterator iter;

for (iter = sendq.begin(); iter != sendq.end(); ) {
for (iter = sendq.begin(); iter != sendq.end();) {
if ((*iter)->count <= count && (*iter)->datatype.id == datatype.id) {
memcpy(buf, (*iter)->libbuf, (*iter)->count * nanompi_get_dtype_size((*iter)->datatype));
memcpy(buf, (*iter)->libbuf,
(*iter)->count * nanompi_get_dtype_size((*iter)->datatype));
delete *iter;
sendq.erase(iter);
return MPI_SUCCESS;
Expand Down
100 changes: 50 additions & 50 deletions backends/socket/socket_backend.c
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <errno.h>

#include "socket_backend.h"
#include "constants.h"
#include "socket_backend.h"

#include "util.h"
#include "comm.h"
#include "util.h"


static int init_server(nanompi_communicator_t *comm)
{
static int init_server(nanompi_communicator_t *comm) {
struct sockaddr_in address;
int status = MPI_SUCCESS;
int rank = comm->my_rank;
Expand All @@ -34,24 +32,25 @@ static int init_server(nanompi_communicator_t *comm)
}

// Try to reuse addr and port to avoid pesky bind: address in use errors >:(
// This is especially useful right now because the job launcher does not forward Ctrl+C signals to each process.
// That means after every run that errors or is Ctrl+C'd out, we get more stray processes
// This is especially useful right now because the job launcher does not forward Ctrl+C signals
// to each process. That means after every run that errors or is Ctrl+C'd out, we get more stray
// processes
// TODO: It seems like this doesn't always work
if (setsockopt(comm->socket_info.server_fd, SOL_SOCKET,
SO_REUSEADDR | SO_REUSEPORT, &opt,
if (setsockopt(comm->socket_info.server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt,
sizeof(opt))) {
perror("setsockopt");
status = MPI_ERR_OTHER;
goto close;
}

// Turn off Nagle's algorithm
// Nagle's algorithm is an OS-level buffering solution. The idea is for chat-like applications where a user
// is typing in one byte at a time, the socket can coalesce (i.e., merge multiple small sends into one large send)
// these bytes to massively increase throughput by reducing the amount of control messages going back and forth.
// We want it off so that for small messages, it doesn't wait for the timeout before sending, messing up the
// latency measurement.
if (setsockopt(comm->socket_info.server_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &opt, sizeof(opt))) {
// Nagle's algorithm is an OS-level buffering solution. The idea is for chat-like applications
// where a user is typing in one byte at a time, the socket can coalesce (i.e., merge multiple
// small sends into one large send) these bytes to massively increase throughput by reducing the
// amount of control messages going back and forth. We want it off so that for small messages,
// it doesn't wait for the timeout before sending, messing up the latency measurement.
if (setsockopt(comm->socket_info.server_fd, IPPROTO_TCP, TCP_NODELAY, (char *)&opt,
sizeof(opt))) {
perror("setsockopt");
status = MPI_ERR_OTHER;
goto close;
Expand All @@ -63,7 +62,8 @@ static int init_server(nanompi_communicator_t *comm)

if (bind(comm->socket_info.server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
char bind_msg[1024] = {'\0'};
snprintf( bind_msg, 1023, "function %s, file %s, line %d, rank %d, port %d, bind error", __func__, __FILE__, __LINE__, rank, grp_proc_pointers[rank]->port);
snprintf(bind_msg, 1023, "function %s, file %s, line %d, rank %d, port %d, bind error",
__func__, __FILE__, __LINE__, rank, grp_proc_pointers[rank]->port);
perror(bind_msg);
status = MPI_ERR_OTHER;
goto close;
Expand All @@ -75,17 +75,15 @@ static int init_server(nanompi_communicator_t *comm)
goto close;
}

for(i = rank - 1; i >= 0; i--) {
// Here "address" is a wildcard--meaning accept a connection from any address. However, since connect/accept is blocking and we "connect"
// to the next higher/lower rank, we fill out client_fds in rank order
// e.g. ranks are 0 1 2
// 0 connects to 1 and blocks
// 1 connects to 2 and blocks
// 2 accepts 1, unblocking 1. 1 is in rank order in 2's client_fds
// 2 accepts 0, 0 is in rank order in 2's client_fds
// 1 accepts 0, 0 is in rank order in 1's client_fds
for (i = rank - 1; i >= 0; i--) {
// Here "address" is a wildcard--meaning accept a connection from any address. However,
// since connect/accept is blocking and we "connect" to the next higher/lower rank, we fill
// out client_fds in rank order e.g. ranks are 0 1 2 0 connects to 1 and blocks 1 connects
// to 2 and blocks 2 accepts 1, unblocking 1. 1 is in rank order in 2's client_fds 2 accepts
// 0, 0 is in rank order in 2's client_fds 1 accepts 0, 0 is in rank order in 1's client_fds
// TODO: is this explanation necessary?
comm->socket_info.client_fds[i] = accept(comm->socket_info.server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
comm->socket_info.client_fds[i] =
accept(comm->socket_info.server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen);
}

exit:
Expand All @@ -95,8 +93,7 @@ static int init_server(nanompi_communicator_t *comm)
goto exit;
}

static int init_clients(nanompi_communicator_t *comm)
{
static int init_clients(nanompi_communicator_t *comm) {
int status = MPI_SUCCESS;
int rank = comm->my_rank;
int size = comm->local_group->grp_proc_count;
Expand All @@ -107,13 +104,13 @@ static int init_clients(nanompi_communicator_t *comm)
struct sockaddr_in *sa_in;

memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET; // IPv4
hints.ai_family = AF_INET; // IPv4

// Make a client socket for every rank greater than mine. Last rank just returns
// without doing anything
for(i = rank + 1; i < size; i++) {
for (i = rank + 1; i < size; i++) {
// Get ip of hostname (resolve dns)
if (err = getaddrinfo(grp_proc_pointers[i]->hostname, NULL, &hints, &res) != 0) {
if ((err = getaddrinfo(grp_proc_pointers[i]->hostname, NULL, &hints, &res)) != 0) {
printf("getaddrinfo failed: %s\n", gai_strerror(err));
goto free;
}
Expand All @@ -128,7 +125,9 @@ static int init_clients(nanompi_communicator_t *comm)
sa_in->sin_port = htons(grp_proc_pointers[i]->port);

// This client may have reached here before the server called accept(), so just keep trying
while (connect(comm->socket_info.client_fds[i], res->ai_addr, res->ai_addrlen));
while (connect(comm->socket_info.client_fds[i], res->ai_addr, res->ai_addrlen)) {
;
}

freeaddrinfo(res);
res = NULL;
Expand All @@ -140,22 +139,23 @@ static int init_clients(nanompi_communicator_t *comm)
if (res) {
freeaddrinfo(res);
}
for(i--; i >= rank + 1; i--) {
for (i--; i >= rank + 1; i--) {
close(comm->socket_info.client_fds[i]);
}
status = MPI_ERR_OTHER;
goto exit;
}

int nanompi_init_socket_backend(nanompi_communicator_t *comm)
{
int nanompi_init_socket_backend(nanompi_communicator_t *comm) {
int status = MPI_SUCCESS;
int size = comm->local_group->grp_proc_count;

comm->socket_info.client_fds = (int*) malloc(sizeof(int) * size);
comm->socket_info.client_fds = (int *)malloc(sizeof(int) * size);
assert(comm->socket_info.client_fds);
if (!comm->socket_info.client_fds) {
PRINT_STDERR("Error mallocing client fds\n");
status = MPI_ERR_OTHER;
goto exit;
}

status = init_clients(comm);
Expand All @@ -177,11 +177,9 @@ int nanompi_init_socket_backend(nanompi_communicator_t *comm)
goto exit;
}

int nanompi_free_socket_backend(nanompi_communicator_t *comm)
{
int nanompi_free_socket_backend(nanompi_communicator_t *comm) {
int status = MPI_SUCCESS;
int size = comm->local_group->grp_proc_count;
int rank = comm->my_rank;
int i;

for (i = 0; i < size; i++) {
Expand All @@ -194,25 +192,27 @@ int nanompi_free_socket_backend(nanompi_communicator_t *comm)
return status;
}

int nanompi_socket_send(const void *buffer, size_t msg_size, int to_rank, nanompi_communicator_t *comm)
{
int nanompi_socket_send(const void *buffer, size_t msg_size, int to_rank,
nanompi_communicator_t *comm) {
int status = MPI_SUCCESS;
size_t sent_bytes = 0;

while (sent_bytes != msg_size) {
sent_bytes += send(comm->socket_info.client_fds[to_rank], buffer + sent_bytes, msg_size - sent_bytes, 0);
sent_bytes += send(comm->socket_info.client_fds[to_rank], buffer + sent_bytes,
msg_size - sent_bytes, 0);
}

return status;
}

int nanompi_socket_recv(void *buffer, size_t msg_size, int from_rank, nanompi_communicator_t *comm)
{
int nanompi_socket_recv(void *buffer, size_t msg_size, int from_rank,
nanompi_communicator_t *comm) {
int status = MPI_SUCCESS;
size_t recv_bytes = 0;

while (recv_bytes != msg_size) {
recv_bytes += recv(comm->socket_info.client_fds[from_rank], buffer + recv_bytes, msg_size - recv_bytes, 0);
recv_bytes += recv(comm->socket_info.client_fds[from_rank], buffer + recv_bytes,
msg_size - recv_bytes, 0);
}

return status;
Expand Down
Loading