Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pairtools/_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

def get_logger(name="pairtools"):
# Based on ipython traitlets
global _loggers
# global _loggers

if name not in _loggers:
_loggers[name] = logging.getLogger(name)
Expand Down
272 changes: 130 additions & 142 deletions pairtools/lib/fileio.py
Original file line number Diff line number Diff line change
@@ -1,156 +1,144 @@
import shutil
import pipes
import subprocess
import shlex
import sys
import typing as tp
from dataclasses import dataclass

class ParseError(Exception):
pass


def auto_open(path, mode, nproc=1, command=None):
"""Guess the file format from the extension and use the corresponding binary
to open it for reading or writing. If the extension is not known, open the
file as text.

If the binary allows parallel execution, specify the number of threads
with `nproc`.

If `command` is supplied, use it to open the file instead of auto-guessing.
The command must accept the filename as the last argument, accept input
through stdin and print output into stdout.

Supported extensions and binaries (with comments):
.bam - samtools view (allows parallel writing)
.gz - pbgzip if available, otherwise bgzip
.lz4 - lz4c (does not support parallel execution)
"""

# Empty filepath or False provided
if not path or path == "-":
if mode == "r":
return sys.stdin
if mode == "w":
return sys.stdout

if command:
if mode == "w":
t = pipes.Template()
t.append(command, "--")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append(command, "--")
f = t.open(path, "r")
MODES_TO_FILES_PRESET = {
'bam': ['w', 'r'],
'gz': ['w', 'r', 'a'],
'lz4': ['w', 'r', 'a'],
}

COMMANDS = {
Comment thread
agalitsyna marked this conversation as resolved.
Outdated
('bam', 'w'): [
{'tool': 'samtools', 'command': 'samtools view -bS {} -'}
Comment thread
agalitsyna marked this conversation as resolved.
Outdated
],
('bam', 'r'): [
{'tool': 'samtools', 'command': 'samtools view -h'}
],
('gz', 'w'): [
{'tool': 'pbgzip', 'command': 'pbgzip -c -n {}'},
{'tool': 'bgzip', 'command': 'bgzip -c -@ {}'},
{'tool': 'gzip', 'command': 'gzip -c'}
],
('gz', 'a'): [
{'tool': 'pbgzip', 'command': 'pbgzip -c -n {}'},
{'tool': 'bgzip', 'command': 'bgzip -c -@ {}'},
{'tool': 'gzip', 'command': 'gzip -c'}
],
('gz', 'r'): [
{'tool': 'pbgzip', 'command': 'pbgzip -dc -n {}'},
{'tool': 'bgzip', 'command': 'bgzip -dc -@ {}'},
{'tool': 'gzip', 'command': 'gzip -dc'}
],
('lz4', 'w'): [
{'tool': 'lz4c', 'command': 'lz4c -cz'}
],
('lz4', 'a'): [
{'tool': 'lz4c', 'command': 'lz4c -cz'}
],
('lz4', 'r'): [
{'tool': 'lz4c', 'command': 'lz4c -cd'}
]
}


@dataclass
class CommandRunResult:
errors: tp.Optional[tp.TextIO]
output: tp.Optional[tp.TextIO]
input: tp.Optional[tp.TextIO]
mode: tp.Optional[tp.Literal['r', 'w', 'a']]

@property
def outfile(self) -> tp.Optional[tp.TextIO]:
if self.mode=='r':
return self.input
if self.mode=='w' or self.mode=='a':
return self.output


@dataclass
class CommandFormatter():

mode: tp.Literal['r', 'w', 'a']
path: tp.Optional[str]=None
command: tp.Optional[tp.Union[tp.List[str], str]]=None
nproc: int=1
is_binary: bool=False

@staticmethod
def form_notfounderror_text(searched_tools: tp.List[str], is_read: bool) -> str:
Comment thread
agalitsyna marked this conversation as resolved.
Outdated
tools_article = 'is' if len(searched_tools) == 1 else 'are'
tools_defenition = 'compress output' if is_read else 'decompress input'
tools_list = f'{"", "".join(searched_tools[:-1])} and {searched_tools[-1]}' if len(searched_tools) > 1 else searched_tools[0]
return f"{tools_list} {tools_article} not found, cannot {tools_defenition}"

def __post_init__(self):
self.__nocommand = False
if self.is_binary:
self.file_mode = f'{self.mode}b'
else:
raise ValueError("Unknown mode : {}".format(mode))
return f

elif path.endswith(".bam"):
if shutil.which("samtools") is None:
raise ValueError(
{
"w": "samtools is not found, cannot compress output",
"r": "samtools is not found, cannot decompress input",
}[mode]
)
if mode == "w":
t = pipes.Template()
t.append(
"samtools view -bS {} -".format(
"-@ " + str(nproc - 1) if nproc > 1 else ""
),
"--",
)
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("samtools view -h", "--")
f = t.open(path, "r")
self.file_mode = self.mode

if self.command:
return

self.__format = self.path.split('.')[-1]

if self.__format not in MODES_TO_FILES_PRESET.keys():
self.__nocommand = True
return
if (self.__format, self.mode) not in COMMANDS.keys():
raise ValueError(f'{self.__format} can not to be opened in {self.mode}')

checked_tools = []
for possible_solution in COMMANDS[(self.__format, self.mode)]:
Comment thread
agalitsyna marked this conversation as resolved.
Outdated
if shutil.which(possible_solution['tool']) is None:
checked_tools.append(possible_solution['tool'])
continue
self.command = possible_solution['command'].format(str(self.nproc))
return

raise ValueError(self.form_notfounderror_text(checked_tools, self.mode=='r'))
Comment thread
agalitsyna marked this conversation as resolved.
Outdated

def __convert_command(self):
if isinstance(self.command, str):
self.__command_to_sp = shlex.split(self.command)
else:
raise ValueError("Unknown mode for .bam : {}".format(mode))
return f

elif path.endswith(".gz"):
if shutil.which("pbgzip") is not None:
if mode == "w":
t = pipes.Template()
t.append("pbgzip -c -n {}".format(nproc), "--")
f = t.open(path, "w")
elif mode == "a":
t = pipes.Template()
t.append("pbgzip -c -n {} $IN >> $OUT".format(nproc), "ff")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("pbgzip -dc -n {}".format(nproc), "--")
f = t.open(path, "r")
else:
raise ValueError("Unknown mode for .gz : {}".format(mode))
elif shutil.which("bgzip") is not None:
if mode == "w":
t = pipes.Template()
t.append("bgzip -c -@ {}".format(nproc), "--")
f = t.open(path, "w")
elif mode == "a":
t = pipes.Template()
t.append("bgzip -c -@ {} $IN >> $OUT".format(nproc), "ff")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("bgzip -dc -@ {}".format(nproc), "--")
f = t.open(path, "r")
else:
raise ValueError("Unknown mode for .gz : {}".format(mode))
elif shutil.which("gzip") is not None:
if mode == "w":
t = pipes.Template()
t.append("gzip -c", "--")
f = t.open(path, "w")
elif mode == "a":
t = pipes.Template()
t.append("gzip -c $IN >> $OUT", "ff")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("gzip -dc", "--")
f = t.open(path, "r")
else:
raise ValueError("Unknown mode for .gz : {}".format(mode))
else:
raise ValueError(
{
"w": "pbgzip, bgzip and gzip are not found, cannot compress output",
"a": "pbgzip, bgzip and gzip are is not found, cannot compress output",
"r": "pbgzip, bgzip and gzip are is not found, cannot decompress input",
}[mode]
)
return f
elif path.endswith(".lz4"):
if shutil.which("lz4c") is None:
raise ValueError(
{
"w": "lz4c is not found, cannot compress output",
"a": "lz4c is not found, cannot compress output",
"r": "lz4c is not found, cannot decompress input",
}[mode]
)
if mode == "w":
t = pipes.Template()
t.append("lz4c -cz", "--")
f = t.open(path, "w")
elif mode == "a":
t = pipes.Template()
t.append("lz4c -cz $IN >> $OUT", "ff")
f = t.open(path, "w")
elif mode == "r":
t = pipes.Template()
t.append("lz4c -cd", "--")
f = t.open(path, "r")
self.__command_to_sp = self.command

def __form_command(self):
self.__process_file = open(self.path, self.file_mode)
if self.mode == 'r':
cmd=subprocess.Popen(self.__command_to_sp, stdin=self.__process_file, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
raise ValueError("Unknown mode : {}".format(mode))
return f
else:
return open(path, mode)
cmd=subprocess.Popen(self.__command_to_sp, stdout=self.__process_file, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
return cmd

def __call__(self):
if not self.path or self.path == "-":
Comment thread
agalitsyna marked this conversation as resolved.
return CommandRunResult(input=sys.stdin, errors=None, output=sys.stdout, mode=self.mode)
if self.__nocommand:
self.__process_file = open(self.path, self.file_mode)
if self.mode=='r':
return CommandRunResult(input=self.__process_file, errors=None, output=None, mode=self.mode)
return CommandRunResult(input=None, errors=None, output=self.__process_file, mode=self.mode)
self.__convert_command()
process = self.__form_command()
return CommandRunResult(input=process.stdin, errors=process.stderr, output=process.stdout, mode=self.mode)


def auto_open(path, mode, nproc=1, command=None):
command = CommandFormatter(mode=mode, path=path, command=command, nproc=nproc)
result = command()
return result.outfile


class PipedIO:
Expand Down