Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
116 changes: 41 additions & 75 deletions meshroom/tractorSubmitter/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,28 +204,30 @@ def cook(self):

class TaskInfo:
def __init__(self, name, cmdArgs, nodeUid, cacheFolder="",
environment=None, rezPackages=None,
service=None, licenses=None, tags=None,
expandingTask=False, chunkParams=None):
environment=None, reqPackages=None, service=None,
licenses=None, taskType=None, tags=None):
self.name = name
self.uid = nodeUid
self.taskCommandArgs = cmdArgs
# Env
self.environment = environment or {}
# Rez packages
self.rezPackages = rezPackages or []
# Requested packages
self.reqPackages = reqPackages or []
# self.limits
self.service = service or os.environ.get("DEFAULT_TRACTOR_SERVICE", "")
self.limits = self.getLimits(licenses)
# Tags
self.tags = tags or {}
self.tags["nodeUid"] = nodeUid

# Expanding / Chunks
self.expandingTask = expandingTask
# self.expandingFile = self._setExpandingTaskFile(cacheFolder)
self.chunks = []
if not expandingTask:
self.chunks = self.getChunks(chunkParams)
taskType_, iteration_ = taskType or ("placeholder", None)
self.placeholderTask = (taskType_=="placeholder")
self.expandingTask = (taskType_=="expanding")
self.preprocessTask = (taskType_=="preprocess")
self.postprocessTask = (taskType_=="postprocess")
self.chunkTask = (taskType_=="chunk")
self.iteration = iteration_

@staticmethod
def getLimits(licenses=None):
Expand All @@ -235,21 +237,6 @@ def getLimits(licenses=None):
taskLimits.append(os.environ['DEFAULT_TRACTOR_LIMIT'])
return taskLimits

@staticmethod
def getChunks(chunkParams) -> list[Chunk]:
""" Get list of chunks """
it = None
ignoreIterations = chunkParams.get("ignoreIterations", [])
if chunkParams:
start, end = chunkParams.get("start", -1), chunkParams.get("end", -2)
size = chunkParams.get("packetSize", 1)
frameRange = list(range(start, end+1, 1))
if frameRange:
slices = [frameRange[i:i + size] for i in range(0, len(frameRange), size)]
it = [Chunk(i, item[0], item[-1]) for i, item in enumerate(slices)
if i not in ignoreIterations]
return it

def _setExpandingTaskFile(self, cacheFolder):
""" Doesn't work with current python API !
It should be possible starting Tractor 1.7 to give a file path to cmd.expand
Expand All @@ -273,62 +260,41 @@ def _setExpandingTaskFile(self, cacheFolder):
def envkey(self):
return toTractorEnv(self.environment)

def getExpandWrappedCmd(self):
cmd = self.taskCommandArgs
# Wrap with create_chunks
cmd = f"meshroom_createChunks --submitter Tractor {cmd}"
# Wrap with rez
cmd = rezWrapCommand(cmd, otherRezPkg=self.rezPackages)
# Wrap with tractor wrapper (will redirect stdout to stderr)
# to make sure stdout only has the
wrapperModule = "tractorSubtaskWrapper.py"
wrapperPath = os.path.join(os.environ["MR_SUBMITTERS_SCRITPS"], wrapperModule)
cmd = f"{sys.executable} {wrapperPath} {cmd}"
return cmd

def cook(self):
title = f"{self.name}"
tags = self.tags
cmd = self.taskCommandArgs
if self.preprocessTask:
cmd += f" --preprocess"
title += "_preprocess"
tags["iteration"] = "preprocess"
elif self.postprocessTask:
cmd += f" --postprocess"
title += "_postprocess"
tags["iteration"] = "postprocess"
elif self.chunkTask:
if self.iteration >= 0:
title += f"_{self.iteration}"
else:
title += f"_0"
cmd += f" --iteration {self.iteration}"
tags["iteration"] = self.iteration

if self.expandingTask:
# Chunks are not created yet so we use the wrapper and the task will expand itself
cmd = self.getExpandWrappedCmd()

elif self.chunks:
# Empty task with multiple commands (sub-tasks) to execute in parallel
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages)
# Wrap with tractor wrapper (will redirect stdout to stderr)
# to make sure stdout only has the
wrapperModule = "tractorExpander.py"
wrapperPath = os.path.join(os.environ["MR_SUBMITTERS_SCRITPS"], wrapperModule)
cmd = f"{sys.executable} {wrapperPath} {cmd}"
elif self.placeholderTask:
cmd = None
else:
# Simple task with only one command to execute
cmd = f"meshroom_compute {self.taskCommandArgs}"
cmd = rezWrapCommand(cmd, otherRezPkg=self.rezPackages)
cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages)

return {
"title": self.name,
"title": title,
"argv": shlex.split(cmd) if cmd else cmd,
"service": self.service,
"metadata": json.dumps(self.tags)
}


class ChunkTaskInfo:
"""
In the case where chunks are already created, and that there are multiple chunks
we will create the chunks from the submitter process.
Here the taskInfo corresponds to the task for the node, and we create an instance of
ChunkTaskInfo per chunk that handles generating information for the chunk task
"""
def __init__(self, taskInfo, chunk):
self.taskInfo: TaskInfo = taskInfo
self.chunk: Chunk = chunk

def cook(self):
title = f"{self.taskInfo.name}_{self.chunk.start}_{self.chunk.end}"
# Update cmd
cmd = f"meshroom_compute {self.taskInfo.taskCommandArgs}"
cmd = f"{cmd} --iteration {self.chunk.iteration}"
cmd = rezWrapCommand(cmd, otherRezPkg=self.taskInfo.rezPackages)
# Update tags
chunkTags = self.taskInfo.tags.copy()
chunkTags["iteration"] = self.chunk.iteration
return {
"title": title,
"argv": shlex.split(cmd), # Never None
"service": self.taskInfo.service,
"metadata": json.dumps(chunkTags),
}
68 changes: 41 additions & 27 deletions meshroom/tractorSubmitter/api/subtaskCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
Helper functions to create subtasks

Provides queueSubtask() to write Tractor subtask definitions to stdout.
Works with tractorSubtaskWrapper.py to ensure proper stream handling.
Works with tractorExpander.py to ensure proper stream handling.

Example :
>>> from tractorSubmitter.api.subtaskCreator import queueSubtask
Expand All @@ -17,7 +17,7 @@
import os
import json
import shlex
from tractorSubmitter.api.base import TaskInfo, ChunkTaskInfo
from tractorSubmitter.api.base import TaskInfo


# Original stdout file descriptor
Expand All @@ -38,16 +38,16 @@ def _getCachedSubtaskStdout():
"""
global _stdout
if _stdout is None:
if 'TRACTOR_SUBTASK_STDOUT_FD' in os.environ:
if 'TRACTOR_STDOUT_FD' in os.environ:
try:
fd = int(os.environ['TRACTOR_SUBTASK_STDOUT_FD'])
fd = int(os.environ['TRACTOR_STDOUT_FD'])
# Open the file descriptor for writing
_stdout = os.fdopen(fd, 'w', buffering=1)
except (ValueError, OSError):
raise RuntimeError("(_getCachedSubtaskStdout) Could not open TRACTOR_SUBTASK_STDOUT_FD")
raise RuntimeError("(_getCachedSubtaskStdout) Could not open TRACTOR_STDOUT_FD")
log(f"(_getCachedSubtaskStdout) stdout={_stdout}")
else:
raise FileNotFoundError("(_getCachedSubtaskStdout) Could not find TRACTOR_SUBTASK_STDOUT_FD")
raise FileNotFoundError("(_getCachedSubtaskStdout) Could not find TRACTOR_STDOUT_FD")
return _stdout


Expand Down Expand Up @@ -144,30 +144,44 @@ def queueSubtask(title, argv, service="", limits=None, metadata=None, envkey=Non
log(f"Queued subtask: {title}")


def queueChunkTask(node, cmdArgs, service, tags=None, rezPackages=None, environment=None):
chunkRangeParams = None
def getChunks(chunkParams):
it = None
ignoreIterations = chunkParams.get("ignoreIterations", [])
if chunkParams:
start, end = chunkParams.get("start", -1), chunkParams.get("end", -2)
size = 1
frameRange = list(range(start, end+1, 1))
if frameRange:
it = [
Chunk(i, )
]
slices = [frameRange[i : i+1] for i in range(0, len(frameRange))]
it = [Chunk(i, item[0], item[-1]) for i, item in enumerate(slices)
if i not in ignoreIterations]
return it


def queueChunkTask(node, cmdArgs, service, tags=None, reqPackages=None, environment=None):
blockSize, fullSize, nbBlocks = node.nodeDesc.parallelization.getSizes(node)
if nbBlocks <= 0:
return
chunkRangeParams = {'start': 0, 'end': nbBlocks - 1, 'step': 1}
licenses = node.nodeDesc._licenses
taskInfo = TaskInfo(
node.name,
cmdArgs,
nodeUid=node._uid,
environment=environment,
rezPackages=rezPackages,
service=service,
licenses=licenses,
tags=tags.copy() if tags else None,
expandingTask=False,
chunkParams=chunkRangeParams
)
for chunk in TaskInfo.getChunks(chunkRangeParams):
chunkInfo = ChunkTaskInfo(taskInfo, chunk)

for iteration in range(nbBlocks):
taskInfo = TaskInfo(
name=node.name,
cmdArgs=cmdArgs,
nodeUid=node._uid,
environment=environment,
reqPackages=reqPackages,
service=service,
licenses=licenses,
taskType=("chunk", iteration),
tags=tags.copy() if tags else None,
)
# title, argv, service, metadata
chunkParams = chunkInfo.cook()
taskArgs = taskInfo.cook()
# limits, envkey
chunkParams['limits'] = taskInfo.limits
chunkParams['envkey'] = taskInfo.envkey
queueSubtask(**chunkParams)
taskArgs['limits'] = taskInfo.limits
taskArgs['envkey'] = taskInfo.envkey
queueSubtask(**taskArgs)
Loading