Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dpdispatcher/entrypoints/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,21 @@ def submission_args() -> Argument:
"work_base",
dtype=str,
optional=False,
doc="Base directory for the work",
doc="Base directory for the work. Combined with machine.local_root to locate task directories.",
Comment thread
njzjz-bot marked this conversation as resolved.
Outdated
),
Argument(
"forward_common_files",
dtype=List[str],
optional=True,
default=[],
doc="Common files to forward to the remote machine",
doc="Files shared by all tasks and uploaded from work_base before execution.",
),
Argument(
"backward_common_files",
dtype=List[str],
optional=True,
default=[],
doc="Common files to backward from the remote machine",
doc="Files shared by all tasks and downloaded back to work_base after execution.",
),
machine_args,
resources_args,
Expand Down
21 changes: 15 additions & 6 deletions dpdispatcher/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,17 +407,26 @@ def gen_command_env_cuda_devices(self, resources):
@classmethod
def arginfo(cls):
# TODO: change the possible value of batch and context types after we refactor the code
doc_batch_type = "The batch job system type. Option: " + ", ".join(cls.options)
doc_batch_type = "Batch backend used to execute jobs. Option: " + ", ".join(
cls.options
)
doc_context_type = (
"The connection used to remote machine. Option: "
"Execution context / connection type used to reach the execution environment. Option: "
+ ", ".join(BaseContext.options)
)
doc_local_root = "The dir where the tasks and relating files locate. Typically the project dir."
doc_remote_root = "The dir where the tasks are executed on the remote machine. Only needed when context is not lazy-local."
doc_local_root = (
"Local project root used by DPDispatcher to find task directories and local files. "
"submission.work_base is resolved inside this directory."
Comment thread
njzjz-bot marked this conversation as resolved.
Outdated
)
doc_remote_root = (
"Remote root directory used by non-local contexts such as SSH. DPDispatcher creates and uses a "
"submission-specific working directory beneath this root on the remote side. For SSHContext, this path should be absolute."
)
doc_clean_asynchronously = (
"Clean the remote directory asynchronously after the job finishes."
"Clean the remote working directory asynchronously after the job finishes. Avoid enabling this while debugging, "
"because it can remove remote artifacts before you inspect them."
)
doc_retry_count = "Number of retries to resubmit failed jobs."
doc_retry_count = "How many times DPDispatcher will retry a failed job before raising an error."

machine_args = [
Argument("batch_type", str, optional=False, doc=doc_batch_type),
Expand Down
100 changes: 68 additions & 32 deletions dpdispatcher/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,15 +685,33 @@ def serialize(self):
@staticmethod
def arginfo():
doc_command = (
"A command to be executed of this task. The expected return code is 0."
"Shell command executed for this task. A zero exit code is treated as success. "
"If the real application may fail before useful artifacts are synchronized, consider "
"wrapping it and saving diagnostics to files that are listed in backward_files."
)
doc_task_work_path = (
"Working directory of this task, resolved relative to submission.work_base. "
"For the smallest local example, use '.'. If you use a subdirectory such as 'task1/', "
"the command runs inside that subdirectory."
)
Comment thread
njzjz-bot marked this conversation as resolved.
Outdated
doc_task_work_path = "The dir where the command to be executed."
doc_forward_files = (
"The files to be uploaded in task_work_path before the task exectued."
"Files to upload for this task before execution. Paths are resolved relative to this "
"task's task_work_path. Put per-task inputs here; files shared by all tasks belong in "
"submission.forward_common_files."
)
doc_backward_files = (
"Files to download for this task after execution. Paths are collected from this task's "
"task_work_path on the execution side and synchronized back to the same relative task "
"directory under local_root/work_base."
Comment thread
njzjz-bot marked this conversation as resolved.
Outdated
)
doc_outlog = (
"Filename for stdout redirection inside task_work_path. The local result is typically "
"available under local_root/work_base/task_work_path/outlog after synchronization."
)
doc_errlog = (
"Filename for stderr redirection inside task_work_path. The local result is typically "
"available under local_root/work_base/task_work_path/errlog after synchronization."
)
doc_backward_files = "The files to be download to local_root in task_work_path after the task finished"
doc_outlog = "The out log file name. redirect from stdout"
doc_errlog = "The err log file name. redirect from stderr"

task_args = [
Argument("command", str, optional=False, doc=doc_command),
Expand Down Expand Up @@ -1157,37 +1175,55 @@ def load_from_dict(cls, resources_dict: dict, allow_ref: bool = False):

@staticmethod
def arginfo(detail_kwargs=True):
doc_number_node = "The number of nodes required for each `job`."
doc_cpu_per_node = "CPU numbers of each node assigned to each job."
doc_gpu_per_node = "GPU numbers of each node assigned to each job."
doc_queue_name = "The queue name of batch job scheduler system."
doc_group_size = "The number of `tasks` in a `job`. 0 means infinity."
doc_custom_flags = "The extra lines pass to job submitting script header"
doc_para_deg = "Decide how many tasks will be run in parallel."
doc_source_list = "The env file to be sourced before the command execution."
doc_module_purge = (
"Remove all modules on HPC system before module load (module_list)"
doc_number_node = "Number of nodes requested for each scheduler job generated by DPDispatcher."
doc_cpu_per_node = (
"Number of CPUs requested on each node for each scheduler job."
)
doc_gpu_per_node = (
"Number of GPUs requested on each node for each scheduler job."
)
doc_queue_name = (
"Queue or partition name used by the selected batch system. For local Shell runs this is "
"usually an empty string; for Slurm it typically maps to a partition."
)
doc_group_size = (
"How many tasks are packed into one scheduler job. For example, 20 tasks with group_size=5 "
"are typically split into 4 jobs. Use 1 for the simplest one-task workflow. 0 means no "
"explicit upper limit in the grouping logic."
)
doc_custom_flags = (
"Extra scheduler-header lines inserted into the generated submission script, typically for "
"backend-specific options that are not covered by the standard fields."
)
doc_para_deg = (
"How many tasks inside one generated job are run in parallel. This is different from group_size: "
"group_size controls how many tasks are bundled into a job, while para_deg controls concurrency "
"within that job. Keep para_deg=1 for the safest default."
)
doc_module_unload_list = (
"The modules to be unloaded on HPC system before submitting jobs"
doc_source_list = (
"Shell scripts or environment files sourced before task commands run. Useful on HPC systems for "
"activating software stacks explicitly instead of relying on login-shell defaults."
)
doc_module_list = (
"The modules to be loaded on HPC system before submitting jobs"
doc_module_purge = "Whether to run 'module purge' before applying module_unload_list and module_list. Mainly useful on HPC systems."
doc_module_unload_list = "Modules to unload before loading the requested modules. Mainly relevant on HPC systems with environment modules."
doc_module_list = "Modules to load before executing tasks. Mainly relevant on HPC systems with environment modules."
doc_envs = "Environment variables exported before executing tasks."
doc_prepend_script = "Optional shell lines inserted before task commands in the generated job script."
doc_append_script = "Optional shell lines inserted after task commands in the generated job script."
doc_wait_time = (
"Delay in seconds inserted after a job is submitted or resubmitted. Usually keep 0 unless the "
"scheduler/site asks you to throttle submission pace."
)
doc_envs = "The environment variables to be exported on before submitting jobs"
doc_prepend_script = "Optional script run before jobs submitted."
doc_append_script = "Optional script run after jobs submitted."
doc_wait_time = "The waitting time in second after a single `task` submitted"
doc_if_cuda_multi_devices = (
"If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS."
"If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task."
"Usually, this option will be used with Task.task_need_resources variable simultaneously."
"If a node has multiple NVIDIA GPUs, assign different tasks inside the same job to different GPUs "
"by setting CUDA_VISIBLE_DEVICES automatically. Usually used together with para_deg > 1 and task-level "
"resource awareness."
)
doc_ratio_unfinished = "The ratio of `tasks` that can be unfinished."
doc_customized_script_header_template_file = (
"The customized template file to generate job submitting script header, "
"which overrides the default file."
doc_ratio_unfinished = (
"Maximum fraction of tasks allowed to remain unfinished when evaluating job completion. Use 0.0 for the "
"strict default that requires every task to finish."
)
doc_customized_script_header_template_file = "Custom template file for the scheduler-header portion of generated submission scripts. Overrides the default template."

strategy_args = [
Argument(
Expand All @@ -1211,7 +1247,7 @@ def arginfo(detail_kwargs=True):
doc=doc_customized_script_header_template_file,
),
]
doc_strategy = "strategies we use to generation job submitting scripts."
doc_strategy = "Strategy options that affect how DPDispatcher generates and evaluates submission scripts."
strategy_format = Argument(
"strategy", dict, strategy_args, optional=True, doc=doc_strategy
)
Expand Down
Loading