Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pairtools/_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

def get_logger(name="pairtools"):
# Based on ipython traitlets
global _loggers
# global _loggers

if name not in _loggers:
_loggers[name] = logging.getLogger(name)
Expand Down
323 changes: 186 additions & 137 deletions pairtools/lib/fileio.py
Original file line number Diff line number Diff line change
@@ -1,156 +1,205 @@
import shutil
import pipes
import subprocess
import shlex
import sys
import typing as tp
from dataclasses import dataclass

class ParseError(Exception):
pass


def auto_open(path, mode, nproc=1, command=None):
"""Guess the file format from the extension and use the corresponding binary
to open it for reading or writing. If the extension is not known, open the
file as text.
# dictionary of allowed automatic opener formats and modes
# key is format, value is list with allowed modes
MODES_TO_FILES_PRESET = {
'bam': ['w', 'r'],
'gz': ['w', 'r', 'a'],
'lz4': ['w', 'r', 'a'],
}

If the binary allows parallel execution, specify the number of threads
with `nproc`.
# dictionary of automatic opener commands
# key is tuple from fomat and mode and value is dictionary
# with keys tools - tool which we will try to find via shutil and command which will be formatted by thread number
#WE NEED COVER
COMMANDS = {
Comment thread
agalitsyna marked this conversation as resolved.
Outdated
('bam', 'w'): [
{'tool': 'samtools', 'command': 'samtools view -bS {} -'}
Comment thread
agalitsyna marked this conversation as resolved.
Outdated
],
('bam', 'r'): [
{'tool': 'samtools', 'command': 'samtools view -h'}
],
('gz', 'w'): [
{'tool': 'pbgzip', 'command': 'pbgzip -c -n {}'},
{'tool': 'bgzip', 'command': 'bgzip -c -@ {}'},
{'tool': 'gzip', 'command': 'gzip -c'}
],
('gz', 'a'): [
{'tool': 'pbgzip', 'command': 'pbgzip -c -n {}'},
{'tool': 'bgzip', 'command': 'bgzip -c -@ {}'},
{'tool': 'gzip', 'command': 'gzip -c'}
],
('gz', 'r'): [
{'tool': 'pbgzip', 'command': 'pbgzip -dc -n {}'},
{'tool': 'bgzip', 'command': 'bgzip -dc -@ {}'},
{'tool': 'gzip', 'command': 'gzip -dc'}
],
('lz4', 'w'): [
{'tool': 'lz4c', 'command': 'lz4c -cz'}
],
('lz4', 'a'): [
{'tool': 'lz4c', 'command': 'lz4c -cz'}
],
('lz4', 'r'): [
{'tool': 'lz4c', 'command': 'lz4c -cd'}
]
}

If `command` is supplied, use it to open the file instead of auto-guessing.
The command must accept the filename as the last argument, accept input
through stdin and print output into stdout.

Supported extensions and binaries (with comments):
.bam - samtools view (allows parallel writing)
.gz - pbgzip if available, otherwise bgzip
.lz4 - lz4c (does not support parallel execution)
@dataclass
class CommandRunResult:
"""
CommandRunResult represents the outcome of executing a command, encapsulating the standard input, output, and error streams, along with the mode of operation.

# Empty filepath or False provided
if not path or path == "-":
if mode == "r":
return sys.stdin
if mode == "w":
return sys.stdout

if command:
if mode == "w":
t = pipes.Template()
t.append(command, "--")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append(command, "--")
f = t.open(path, "r")
else:
raise ValueError("Unknown mode : {}".format(mode))
return f

elif path.endswith(".bam"):
if shutil.which("samtools") is None:
raise ValueError(
{
"w": "samtools is not found, cannot compress output",
"r": "samtools is not found, cannot decompress input",
}[mode]
)
if mode == "w":
t = pipes.Template()
t.append(
"samtools view -bS {} -".format(
"-@ " + str(nproc - 1) if nproc > 1 else ""
),
"--",
)
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("samtools view -h", "--")
f = t.open(path, "r")
Attributes:
errors (Optional[TextIO]): The standard error stream (stderr) of the process. Can be None if not applicable.
output (Optional[TextIO]): The standard output stream (stdout) of the process. Can be None if not applicable.
input (Optional[TextIO]): The standard input stream (stdin) of the process. Can be None if not applicable.
mode (Optional[Literal['r', 'w', 'a']]): The mode in which the file is opened: 'r' for reading, 'w' for writing, or 'a' for appending.

Properties:
outfile (Optional[TextIO]): Returns the appropriate file-like object based on the mode:
- 'r': Returns the output stream (stdout) for reading.
- 'w' or 'a': Returns the input stream (stdin) for writing or appending.
"""

errors: tp.Optional[tp.TextIO]
output: tp.Optional[tp.TextIO]
input: tp.Optional[tp.TextIO]
mode: tp.Optional[tp.Literal['r', 'w', 'a']]

@property
def outfile(self) -> tp.Optional[tp.TextIO]:
if self.mode=='r':
return self.output
if self.mode=='w' or self.mode=='a':
return self.input


@dataclass
class CommandFormatter():
"""
A class to manage file opening operations with support for various compression formats.

Attributes:
mode (str): Mode in which the file is to be opened ('r', 'w', or 'a').
path (Optional[str]): Path to the target file.
Comment thread
agalitsyna marked this conversation as resolved.
Outdated
command (Optional[Union[List[str], str]]): Custom command for file processing.
nproc (int): Number of threads for multithreaded tools. Defaults to 1.
is_binary (bool): Indicates if the file should be opened in binary mode. Defaults to False.

Methods:
__call__(): Executes the command or opens the file based on the provided parameters. Return pairtools.lib.fileio.CommandRunResult object
"""

mode: tp.Literal['r', 'w', 'a']
path: tp.Optional[str]=None
command: tp.Optional[tp.Union[tp.List[str], str]]=None
nproc: int=1
is_binary: bool=False

#Error textm formatter. If some bash tools is needed but not found, user catch error like SomeTool not found, cannot compress output
#This method is used to construct understandable error text
@staticmethod
def form_notfounderror_text(searched_tools: tp.List[str], is_read: bool) -> str:
Comment thread
agalitsyna marked this conversation as resolved.
Outdated
tools_article = 'is' if len(searched_tools) == 1 else 'are'
tools_defenition = 'compress output' if is_read else 'decompress input'
tools_list = f'{"", "".join(searched_tools[:-1])} and {searched_tools[-1]}' if len(searched_tools) > 1 else searched_tools[0]
return f"{tools_list} {tools_article} not found, cannot {tools_defenition}"

def __post_init__(self):
self.__nocommand = False
if self.is_binary:
self.file_mode = f'{self.mode}b'
else:
raise ValueError("Unknown mode for .bam : {}".format(mode))
return f

elif path.endswith(".gz"):
if shutil.which("pbgzip") is not None:
if mode == "w":
t = pipes.Template()
t.append("pbgzip -c -n {}".format(nproc), "--")
f = t.open(path, "w")
elif mode == "a":
t = pipes.Template()
t.append("pbgzip -c -n {} $IN >> $OUT".format(nproc), "ff")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("pbgzip -dc -n {}".format(nproc), "--")
f = t.open(path, "r")
else:
raise ValueError("Unknown mode for .gz : {}".format(mode))
elif shutil.which("bgzip") is not None:
if mode == "w":
t = pipes.Template()
t.append("bgzip -c -@ {}".format(nproc), "--")
f = t.open(path, "w")
elif mode == "a":
t = pipes.Template()
t.append("bgzip -c -@ {} $IN >> $OUT".format(nproc), "ff")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("bgzip -dc -@ {}".format(nproc), "--")
f = t.open(path, "r")
else:
raise ValueError("Unknown mode for .gz : {}".format(mode))
elif shutil.which("gzip") is not None:
if mode == "w":
t = pipes.Template()
t.append("gzip -c", "--")
f = t.open(path, "w")
elif mode == "a":
t = pipes.Template()
t.append("gzip -c $IN >> $OUT", "ff")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("gzip -dc", "--")
f = t.open(path, "r")
else:
raise ValueError("Unknown mode for .gz : {}".format(mode))
self.file_mode = self.mode

#if we have command, just run it
if self.command:
return

self.__format = self.path.split('.')[-1]

#if format not in MODES_TO_FILES_PRESET, we will return opened file in given mode
if self.__format not in MODES_TO_FILES_PRESET.keys():
self.__nocommand = True
return

#cannot open given format in given mode
if (self.__format, self.mode) not in COMMANDS.keys():
raise ValueError(f'{self.__format} can not to be opened in {self.mode}')

#iterate over tools. Function calling ends when tool is founded
#just forms self.command
checked_tools = []
for possible_solution in COMMANDS[(self.__format, self.mode)]:
Comment thread
agalitsyna marked this conversation as resolved.
Outdated
if shutil.which(possible_solution['tool']) is None:
checked_tools.append(possible_solution['tool'])
continue
self.command = possible_solution['command'].format(str(self.nproc))
return

#if we have no return (in iterate over tools), catch an error
raise ValueError(self.form_notfounderror_text(checked_tools, self.mode=='r'))
Comment thread
agalitsyna marked this conversation as resolved.
Outdated

def __convert_command(self):
if isinstance(self.command, str):
self.__command_to_sp = shlex.split(self.command)
else:
raise ValueError(
{
"w": "pbgzip, bgzip and gzip are not found, cannot compress output",
"a": "pbgzip, bgzip and gzip are is not found, cannot compress output",
"r": "pbgzip, bgzip and gzip are is not found, cannot decompress input",
}[mode]
)
return f
elif path.endswith(".lz4"):
if shutil.which("lz4c") is None:
raise ValueError(
{
"w": "lz4c is not found, cannot compress output",
"a": "lz4c is not found, cannot compress output",
"r": "lz4c is not found, cannot decompress input",
}[mode]
)
if mode == "w":
t = pipes.Template()
t.append("lz4c -cz", "--")
f = t.open(path, "w")
elif mode == "a":
t = pipes.Template()
t.append("lz4c -cz $IN >> $OUT", "ff")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("lz4c -cd", "--")
f = t.open(path, "r")
self.__command_to_sp = self.command

def __form_command(self):
self.__process_file = open(self.path, self.file_mode)
if self.mode == 'r':
cmd=subprocess.Popen(self.__command_to_sp, stdin=self.__process_file, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
else:
raise ValueError("Unknown mode : {}".format(mode))
return f
else:
return open(path, mode)
cmd=subprocess.Popen(self.__command_to_sp, stdout=self.__process_file, stdin=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return cmd

def __call__(self) -> CommandRunResult:
if not self.path or self.path == "-":
Comment thread
agalitsyna marked this conversation as resolved.
return CommandRunResult(input=sys.stdout, errors=None, output=sys.stdin, mode=self.mode)
if self.__nocommand:
self.__process_file = open(self.path, self.file_mode)
return CommandRunResult(input=self.__process_file, errors=None, output=self.__process_file, mode=self.mode)
self.__convert_command()
process = self.__form_command()
return CommandRunResult(input=process.stdin, errors=process.stderr, output=process.stdout, mode=self.mode)


def auto_open(path, mode, nproc=1, command=None):
"""
Automatically opens a file based on its format and access mode.

Determines the file format from its extension and selects the appropriate
command for reading or writing, utilizing available compression or
decompression tools. If a specific command is provided, it will be used
directly.

Parameters:
path (str): Path to the file or '-' for standard input/output.
mode (str): File access mode: 'r' (read), 'w' (write), or 'a' (append).
nproc (int, optional): Number of threads for multithreaded tools. Defaults to 1.
command (str or list, optional): Custom command for file processing. Defaults to None.

Returns:
file-like object: A file object ready for reading or writing data.

Raises:
ValueError: If the file format is unsupported or the required tool is not found.
"""
command = CommandFormatter(mode=mode, path=path, command=command, nproc=nproc)
result = command()
return result.outfile


class PipedIO:
Expand Down