Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions hpc_launcher/cli/torchrun_hpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@

def main():
parser = argparse.ArgumentParser(
description="A wrapper script that launches and runs distributed PyTorch on HPC systems."
description=
"A wrapper script that launches and runs distributed PyTorch on HPC systems."
)
common_args.setup_arguments(parser)
parser.add_argument(
Expand All @@ -57,11 +58,20 @@ def main():
"--unswap-rocr-hip-vis-dev",
action="store_true",
default=False,
help="Undo moving ROCR_VISIBLE_DEVICES into the HIP_VISIBLE_DEVICES env variable. "
help=
"Undo moving ROCR_VISIBLE_DEVICES into the HIP_VISIBLE_DEVICES env variable. "
"In PyTorch codes HIP_VISIBLE_DEVICES is most similar to CUDA_VISIBLE_DEVICES. "
"Ensureing that HIP vs ROCR can improve behavior of HF Accelerate and TorchTitan.",
)

parser.add_argument(
"-m",
"--module",
action="store_true",
default=False,
help="If specified, the command will be interpreted as "
"a Python module (similar to `python -m module ...`).")

# Grab the rest of the command line to launch
# torchrun-hpc does not support running with a pre-generated batch script file
parser.add_argument("command", help="Command to be executed")
Expand All @@ -86,7 +96,9 @@ def main():
if args.job_comm_protocol:
optimize_comm_protocol = args.job_comm_protocol
if optimize_comm_protocol.upper() == "MPI":
logger.warning(f"Using MPI as the primary communication protocol for PyTorch requires additional support")
logger.warning(
f"Using MPI as the primary communication protocol for PyTorch requires additional support"
)
else:
system.job_comm_protocol = "*CCL"
# Pick batch scheduler
Expand Down Expand Up @@ -122,21 +134,22 @@ def main():
)
exit(1)

if args.bg and args.launch_dir is None: # or args.batch_script
if args.bg and args.launch_dir is None: # or args.batch_script
# If running a batch job with no launch directory argument,
# run in the generated timestamped directory
args.launch_dir = ""
if args.launch_dir is None and not args.bg:
args.launch_dir = ""
logger.info(f"torchrun-hpc needs to run jobs from a launch directory -- automatically setting the -l (--launch-dir) CLI argument")
logger.info(
f"torchrun-hpc needs to run jobs from a launch directory -- automatically setting the -l (--launch-dir) CLI argument"
)

_, folder_name = scheduler.create_launch_folder_name(
args.command, "torchrun_hpc", args.launch_dir
)
args.command, "torchrun_hpc", args.launch_dir)

script_file = scheduler.create_launch_folder(
folder_name, not args.bg, args.output_script, args.dry_run
)
script_file = scheduler.create_launch_folder(folder_name, not args.bg,
args.output_script,
args.dry_run)

trampoline_file = "torchrun_hpc_trampoline.py"

Expand All @@ -152,8 +165,11 @@ def main():
launch_args = [
"-u",
f"{os.path.abspath(folder_name)}/{trampoline_file}",
os.path.abspath(args.command),
]
if args.module:
launch_args += ["-m", args.command]
else:
launch_args.append(os.path.abspath(args.command))
launch_args += args.args

logger.info(f"Running job in directory: {folder_name}")
Expand Down
51 changes: 30 additions & 21 deletions hpc_launcher/torch/torchrun_hpc_trampoline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,41 @@
def main():
# Strip off the name of this script and pass the rest to runpy
args = sys.argv[1:]
if args[0] == "-m":
is_module = True
args = args[1:]
else:
is_module = False

scheduler_type = os.getenv("TORCHRUN_HPC_SCHEDULER")
scheduler = get_schedulers()[scheduler_type]
(world_size, rank, local_world_size, local_rank) = (
scheduler.get_parallel_configuration()
)
(world_size, rank, local_world_size,
local_rank) = (scheduler.get_parallel_configuration())

# Check on the backend and report if the memory size was set
backend = None
device = None
if torch.cuda.is_available():
backend = "nccl"
device = "cuda"
fraction_max_gpu_mem = float(os.getenv("HPC_LAUNCHER_MAX_GPU_MEM", 1.0))
fraction_max_gpu_mem = float(os.getenv("HPC_LAUNCHER_MAX_GPU_MEM",
1.0))
if fraction_max_gpu_mem != 1.0 and rank == 0:
print(
f"[Rank {rank} of {world_size}] TORCHRUN-HPC set the max GPU memory fraction to {fraction_max_gpu_mem}"
)
else:
backend = "gloo"
device="cpu"
device = "cpu"

# Standard operating mode assumes that there is one rank per GPU
# Check to see how many GPUS are actually available to this rank
avail_gpus = 0
gpus = []
for e in ["CUDA_VISIBLE_DEVICES", "ROCR_VISIBLE_DEVICES", "HIP_VISIBLE_DEVICES"]:
for e in [
"CUDA_VISIBLE_DEVICES", "ROCR_VISIBLE_DEVICES",
"HIP_VISIBLE_DEVICES"
]:
if os.getenv(e):
gpus = os.getenv(e)
break
Expand Down Expand Up @@ -97,21 +105,18 @@ def main():
f"[Rank {rank} of {world_size}]: Initializing distributed PyTorch using protocol: {rdv_protocol}"
)
# TODO(later): Fix how we handle CUDA visible devices and MPI bind
dist.init_process_group(
backend, init_method=rdv_protocol, world_size=world_size, rank=rank, device_id=torch.device(device, local_device_id)
)
dist.init_process_group(backend,
init_method=rdv_protocol,
world_size=world_size,
rank=rank,
device_id=torch.device(
device, local_device_id))

if rdv_protocol == "mpi://" and rank == 0:
print(
"[Rank {} of {}]: MPI Version: {}".format(
rank, world_size, MPI.Get_version()
)
)
print(
"[Rank {} of {}]: MPI Implementation: {}".format(
rank, world_size, MPI.Get_library_version()
)
)
print("[Rank {} of {}]: MPI Version: {}".format(
rank, world_size, MPI.Get_version()))
print("[Rank {} of {}]: MPI Implementation: {}".format(
rank, world_size, MPI.Get_library_version()))

# If the world size is only 1, torch distributed doesn't have to be initialized
# however, the called application may try to setup torch distributed -- provide env variables
Expand All @@ -130,9 +135,13 @@ def main():
os.environ["MASTER_PORT"] = "23456"

# Note that run_path will prepend the args[0] back onto the sys.argv so it needs to be stripped off first
sys.argv = sys.argv[1:]
sys.argv = sys.argv[1:] if not is_module else sys.argv[2:]

# Run underlying script
runpy.run_path(args[0], run_name="__main__")
if is_module:
runpy.run_module(args[0], run_name="__main__", alter_sys=True)
else:
runpy.run_path(args[0], run_name="__main__")

if dist.is_initialized():
# Deal with destroying the process group here
Expand Down
Empty file added tests/e2e/relimport/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions tests/e2e/relimport/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import sys
from .subfolder.b import g

if __name__ == "__main__":
val = float(sys.argv[1])
print(g(val))
4 changes: 4 additions & 0 deletions tests/e2e/relimport/a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@


def f(value: int) -> int:
return value + 1
Empty file.
4 changes: 4 additions & 0 deletions tests/e2e/relimport/subfolder/b.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from ..a import f

def g(a: float) -> int:
return f(int(a + 2.5))
78 changes: 78 additions & 0 deletions tests/test_relative_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright (c) 2014-2026, Lawrence Livermore National Security, LLC.
# Produced at the Lawrence Livermore National Laboratory.
# Written by the LBANN Research Team (B. Van Essen, et al.) listed in
# the CONTRIBUTORS file. See the top-level LICENSE file for details.
#
# LLNL-CODE-697807.
# All rights reserved.
#
# This file is part of LBANN: Livermore Big Artificial Neural Network
# Toolkit. For details, see http://software.llnl.gov/LBANN or
# https://github.com/LBANN and https://github.com/LLNL/LBANN.
#
# SPDX-License-Identifier: (Apache-2.0)
import pytest

import subprocess
import shutil
import os
import re
import sys
import shutil

from hpc_launcher.systems import autodetect
from hpc_launcher.systems.lc.sierra_family import Sierra
from hpc_launcher.schedulers import get_schedulers


def test_torchrun_hpc_relimport():
scheduler_type = "slurm"
if ((scheduler_type == "slurm" and
(not shutil.which("srun")
or shutil.which("srun") and shutil.which("jsrun"))) or
(scheduler_type == "flux" and
(not shutil.which("flux") or not os.path.exists("/run/flux/local")))
or (scheduler_type == "lsf" and not shutil.which("jsrun"))):
pytest.skip("No distributed launcher found")

scheduler = get_schedulers()[scheduler_type]
num_nodes_in_allocation = scheduler.num_nodes_in_allocation()
if not num_nodes_in_allocation is None and num_nodes_in_allocation == 1:
pytest.skip(
"Executed inside of an allocation with insufficient resources")

try:
import torch
except (ImportError, ModuleNotFoundError):
pytest.skip("torch not found")

cmd = [
sys.executable,
"-m",
"hpc_launcher.cli.torchrun_hpc",
"-l",
"-v",
"-N",
"1",
"-n",
"1",
"-m",
"relimport",
"4.75",
]
cwd = os.path.join(os.path.dirname(__file__), "e2e")
proc = subprocess.run(cmd,
universal_newlines=True,
capture_output=True,
cwd=cwd)
exp_dir = None

assert proc.returncode == 0
assert proc.stdout.strip() == "8"

if exp_dir:
shutil.rmtree(exp_dir, ignore_errors=True)


if __name__ == "__main__":
test_torchrun_hpc_relimport()