Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions phase/src/caller/caller_header.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ class caller {
//FILE I/O
void print_ref_panel_info(const std::string ref_string);
void read_files_and_initialise();
//Single attempt at reading the binary reference panel (haplotype_set H +
//variant_map V). Returns true on success; on failure fills err_msg and returns
//false so the caller can retry transient localization/read failures (panel
//streamed or localized from a cloud filesystem). Each call re-opens the file and
//boost overwrites H/V, so a retry restarts the read from a clean archive. When the
//failure is non-retryable (e.g. a GLIMPSE/boost version mismatch), non_retryable is
//set to true so the caller can stop immediately instead of burning its retries.
bool read_binary_reference_panel(const std::string& reference_filename, std::string& err_msg, bool& non_retryable);
void setup_mpileup();
void read_BAMs();

Expand Down
100 changes: 86 additions & 14 deletions phase/src/caller/caller_initialise.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
#include <io/gmap_reader.h>
#include <io/genotype_bam_caller.h>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/archive/archive_exception.hpp>
#include <containers/glimpse_mpileup.h>
#include <io/retry_io.h>
#include <chrono>

void caller::print_ref_panel_info(const std::string ref_string)
{
Expand Down Expand Up @@ -87,20 +90,18 @@ void caller::read_files_and_initialise() {
vrb.wait(" * Binary reference panel parsing");
tac.clock();
{
std::ifstream ifs(reference_filename, std::ios::binary | std::ios_base::in);
if (!ifs.good()) vrb.error("Reading binary reference panel file: [" + reference_filename + "]. File not good(): eofbit, failbit or badbit set or file not found.");
try
{
boost::archive::binary_iarchive ia(ifs);
ia >> H;
ia >> V;
} catch (std::exception& e ) {
std::stringstream err_str;
err_str <<"problems reading the binary reference panel (exception triggered by boost archive). Please ensure you are using the same GLIMPSE and boost library version";
err_str << e.what();
vrb.error(err_str.str());
}
if (H.Ypacked.size()==0) vrb.error("Problem reading binary file format. Empty PBWT detected.");
//Localizing/reading the binary reference panel can transiently fail when the
//panel is streamed or localized from a cloud filesystem. Retry the whole read
//(open + deserialize + sanity check) a few times with exponential backoff. Each
//attempt re-opens the file and boost overwrites H/V, so every retry restarts
//from a clean archive.
vrb.bullet("Localizing binary reference panel [" + reference_filename + "] via retry-enabled read");
retry_with_backoff("reading binary reference panel [" + reference_filename + "]", 3, std::chrono::seconds(1), [&]() -> attempt_result {
std::string err_msg;
bool non_retryable = false;
const bool ok = read_binary_reference_panel(reference_filename, err_msg, non_retryable);
return { ok, non_retryable, err_msg };
});

vrb.bullet("Binary reference panel parsing [done] (" + stb.str(tac.rel_time()*1.0/1000, 2) + "s)");
print_ref_panel_info("Binary");
Expand Down Expand Up @@ -174,6 +175,77 @@ void caller::read_files_and_initialise() {

}

bool caller::read_binary_reference_panel(const std::string& reference_filename, std::string& err_msg, bool& non_retryable)
{
err_msg.clear();
non_retryable = false;

//(1) Open the file. A failure to open is not a transient localization hiccup: on the
//common case (a missing or mistyped --reference path) retrying just delays the
//failure, so report it precisely and mark it non_retryable to fail fast (matching the
//pre-retry behaviour). Transient localization failures surface later, as read errors
//during deserialization, not as a failed open.
std::ifstream ifs(reference_filename, std::ios::binary | std::ios_base::in);
if (!ifs.good())
{
err_msg = "could not open file (not good(): eofbit, failbit or badbit set, or file not found). Please check the path.";
non_retryable = true;
return false;
}

//(2) Deserialize the archive. boost overwrites H/V in place, so a failed attempt
//here leaves them partially written; the next retry re-opens and overwrites again,
//restarting from a clean archive. A truncated/incomplete localization surfaces as a
//retryable input_stream_error (EOF mid-stream); a bad/wrong archive header
//(invalid_signature) or a GLIMPSE/boost version mismatch are non_retryable, since the
//bytes already read are wrong and retrying will never help.
try
{
boost::archive::binary_iarchive ia(ifs);
ia >> H;
ia >> V;
}
catch (const boost::archive::archive_exception& e)
{
std::string hint;
switch (e.code)
{
case boost::archive::archive_exception::unsupported_version:
case boost::archive::archive_exception::unsupported_class_version:
case boost::archive::archive_exception::unregistered_class:
case boost::archive::archive_exception::incompatible_native_format:
non_retryable = true;
hint = ". This looks like a version mismatch; please ensure you are using the same GLIMPSE and boost library versions";
break;
case boost::archive::archive_exception::invalid_signature:
non_retryable = true;
hint = ". The file is not a valid GLIMPSE binary reference panel";
break;
default:
//input_stream_error and the like (truncated/partial read mid-stream) are
//treated as transient and left retryable.
break;
}
err_msg = std::string("exception while parsing boost archive: ") + e.what() + hint;
return false;
}
catch (std::exception& e)
{
err_msg = std::string("exception while parsing boost archive: ") + e.what();
return false;
}

//(3) Sanity check the deserialized panel. An empty PBWT indicates a corrupt or
//incompletely localized file; treat as a retryable failure.
if (H.Ypacked.size() == 0)
{
err_msg = "empty PBWT detected after parsing (corrupt or incompletely localized binary file)";
return false;
}

return true;
}


void caller::setup_mpileup()
{
Expand Down
113 changes: 87 additions & 26 deletions phase/src/io/genotype_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
******************************************************************************/

#include <io/genotype_reader.h>
#include <io/retry_io.h>
#include <thread>
#include <chrono>

std::map<std::string, int> mapPloidy = {
{"1",1},
Expand Down Expand Up @@ -136,6 +139,22 @@ void genotype_reader::initReader(bcf_srs_t *sr, std::string& file, int nthreads)
}
}

int genotype_reader::openTargetReader(bcf_srs_t *sr, std::string& file, int nthreads)
{
sr->require_index = 1;
//Region/target setup failures are deterministic config errors, not transient cloud
//hiccups, so they stay fatal.
if (bcf_sr_set_regions(sr, V.input_gregion.c_str(), 0) == -1) vrb.error("Impossible to jump to region [" + V.input_gregion + "] in [" + file + "]");
if (bcf_sr_set_targets(sr, V.input_gregion.c_str(), 0, 0) == -1) vrb.error("Impossible to set target region [" + V.input_gregion + "] in [" + file + "]");

if (nthreads>1) bcf_sr_set_threads(sr, nthreads);

//A failed open/index load is the most common transient cloud-streaming failure;
//return the errnum (nonzero) so the caller can retry instead of aborting here.
if(!(bcf_sr_add_reader (sr, file.c_str()))) return sr->errnum ? sr->errnum : -1;
return 0;
}

void genotype_reader::readGenotypesAndBAMs(std::string fref,int nthreads)
{
bcf_srs_t * sr_scan = bcf_sr_init();
Expand Down Expand Up @@ -177,6 +196,7 @@ void genotype_reader::initReader(bcf_srs_t *sr, std::string& fmain, std::string&

void genotype_reader::readGenotypes(std::string fmain, std::string fref, int nthreads)
{
vrb.bullet("Reading target GLs from [" + fmain + "] via synced-reader path (VCF/BCF reference panel) -- NO streaming retries on this path");
bcf_srs_t * sr_scan = bcf_sr_init();

initReader(sr_scan, fmain, fref, nthreads);
Expand Down Expand Up @@ -245,6 +265,7 @@ void genotype_reader::scanGenotypesCommon(bcf_srs_t * sr, int ref_sr_n /* Refere
V.push(new variant (line_ref->pos + 1, std::string(line_ref->d.id), line_ref->d.allele[0], line_ref->d.allele[1], line_type, V.size(), cref, calt, line_type==VCF_SNP && (line_ref->pos != prev_pos)));
prev_pos=line_ref->pos;
}
if (sr->errnum) vrb.error("Error while scanning VCF/BCF file(s): " + std::string(bcf_sr_strerror(sr->errnum)));
free(vAC);
free(vAN);
if (H.n_tot_sites == 0) vrb.error("No variants to be imputed in files");
Expand All @@ -271,33 +292,54 @@ void genotype_reader::scanGenotypes(bcf_srs_t * sr) {
}
void genotype_reader::readTarGenotypes(std::string fmain, int nthreads)
{
//TODO //FIXME //TO TEST
bcf_srs_t * sr_scan = bcf_sr_init();
//Retries are not implemented in hfile_libcurl (used when streaming from a cloud
//location), so we retry transient read failures of each streaming pass here. The
//scan only accumulates into the local vec_pos_tar (reset by clearing it at the start
//of scanTarGenotypes) and reads the sample names/header, and the parse overwrites the
//genotype set by site index, so each retry restarts the pass from a clean state.
vrb.bullet("Reading target GLs from [" + fmain + "] via retry-enabled streaming path (binary reference panel)");
const int n_retry = 3;
const std::chrono::seconds base_delay(1);

initReader(sr_scan, fmain, nthreads);
M.n_tar_samples = bcf_hdr_nsamples(sr_scan->readers[0].header);
M.tar_sample_names = std::vector<std::string> (M.n_tar_samples);

for (int i = 0 ; i < M.n_tar_samples ; i ++)
M.tar_sample_names[i] = std::string(sr_scan->readers[0].header->samples[i]);
std::vector<variant*> vec_pos_tar;

vrb.wait(" * VCF/BCF scanning");
tac.clock();
retry_with_backoff("scanning target GLs [" + fmain + "]", n_retry, base_delay, [&]() -> attempt_result {
const int err = scanTarGenotypes(fmain, nthreads, vec_pos_tar);
return { err == 0, false, err ? std::string(bcf_sr_strerror(err)) : std::string() };
});

//Sample names + ploidy are now known from the (retried) scan pass; set_ploidy_tar
//allocates the genotype set, so it must run exactly once, after a successful scan and
//before the parse pass writes into it.
set_ploidy_tar();

vrb.wait(" * VCF/BCF scanning");
vrb.wait(" * VCF/BCF parsing");
tac.clock();
retry_with_backoff("parsing target GLs [" + fmain + "]", n_retry, base_delay, [&]() -> attempt_result {
const int err = parseTarGenotypes(fmain, nthreads, vec_pos_tar);
return { err == 0, false, err ? std::string(bcf_sr_strerror(err)) : std::string() };
});
}

//Scan file
bcf1_t * line_main = NULL;
int npl_main, npl_arr_main = 0, *pl_arr_main = NULL;
int ngl_main, ngl_arr_main = 0;
float *gl_arr_main = NULL;
const int max_ploidyP1=H.max_ploidy+1;
int *ptr;
int main_file_max_ploidy=0;
float *ptr_f;
std::vector<variant*> vec_pos_tar;
int genotype_reader::scanTarGenotypes(std::string fmain, int nthreads, std::vector<variant*>& vec_pos_tar)
{
vec_pos_tar.clear(); //reset any partial accumulation from a failed attempt

bcf_srs_t * sr_scan = bcf_sr_init();
const int open_err = openTargetReader(sr_scan, fmain, nthreads);
if (open_err) { bcf_sr_destroy(sr_scan); return open_err; }
sr_scan->max_unpack = BCF_UN_INFO;

//Read sample names/header from this (retried) pass, so a transient open failure here
//is retried rather than aborting the run in a separate, un-retried header open.
M.n_tar_samples = bcf_hdr_nsamples(sr_scan->readers[0].header);
M.tar_sample_names = std::vector<std::string> (M.n_tar_samples);
for (int i = 0 ; i < M.n_tar_samples ; i ++)
M.tar_sample_names[i] = std::string(sr_scan->readers[0].header->samples[i]);

bcf1_t * line_main = NULL;
while (bcf_sr_next_line (sr_scan))
{
if (!bcf_sr_has_line(sr_scan, 0) || bcf_sr_get_line(sr_scan, 0)->n_allele != 2) continue; //always ref
Expand All @@ -307,22 +349,37 @@ void genotype_reader::readTarGenotypes(std::string fmain, int nthreads)
if (vecS.size() > 0) vec_pos_tar.push_back(vecS[0]);
else vec_pos_tar.push_back(nullptr);
}

const int err = sr_scan->errnum;
bcf_sr_destroy(sr_scan);
return err;
}

vrb.wait(" * VCF/BCF parsing");
tac.clock();

bcf_srs_t * sr_parse = bcf_sr_init();
initReader(sr_parse, fmain, nthreads);
int genotype_reader::parseTarGenotypes(std::string fmain, int nthreads, const std::vector<variant*>& vec_pos_tar)
{
bcf1_t * line_main = NULL;
int npl_main, npl_arr_main = 0, *pl_arr_main = NULL;
int ngl_main, ngl_arr_main = 0;
float *gl_arr_main = NULL;
const int max_ploidyP1=H.max_ploidy+1;
int *ptr;
int main_file_max_ploidy=0;
float *ptr_f;
int i_site=0;
const int n_ref_haps = H.n_ref_haps;

bcf_srs_t * sr_parse = bcf_sr_init();
const int open_err = openTargetReader(sr_parse, fmain, nthreads);
if (open_err) { bcf_sr_destroy(sr_parse); return open_err; }
sr_parse->max_unpack = BCF_UN_ALL;
const int n_ref_haps = H.n_ref_haps;

while (bcf_sr_next_line (sr_parse))
{
if (!bcf_sr_has_line(sr_parse, 0) || bcf_sr_get_line(sr_parse, 0)->n_allele != 2) continue; //always ref
//The scan and parse passes are independent re-opens; if the parse pass yields more
//surviving sites than the scan recorded (e.g. the file changed between passes),
//indexing vec_pos_tar[i_site] would read past its end. Fail loudly instead.
if (i_site >= (int)vec_pos_tar.size())
vrb.error("Target GL file [" + fmain + "] returned more sites while parsing than while scanning (" + std::to_string(vec_pos_tar.size()) + "). The file may have changed between the two streaming passes.");
if (vec_pos_tar[i_site] == nullptr)
{
++i_site;
Expand Down Expand Up @@ -410,9 +467,11 @@ void genotype_reader::readTarGenotypes(std::string fmain, int nthreads)
++i_site;
}
}
const int err = sr_parse->errnum;
bcf_sr_destroy(sr_parse);
free(pl_arr_main);
free(gl_arr_main);
return err;
}

void genotype_reader::set_ploidy_tar()
Expand Down Expand Up @@ -642,6 +701,7 @@ void genotype_reader::parseGenotypes(bcf_srs_t * sr) {
prog_bar+=prog_step;
vrb.progress(" * VCF/BCF parsing", prog_bar);
}
if (sr->errnum) vrb.error("Error while parsing VCF/BCF file(s): " + std::string(bcf_sr_strerror(sr->errnum)));
free(pl_arr_main);
free(gl_arr_main);
free(gt_arr_ref);
Expand Down Expand Up @@ -700,6 +760,7 @@ void genotype_reader::parseRefGenotypes(bcf_srs_t * sr) {
prog_bar+=prog_step;
vrb.progress(" * Reference panel parsing ", prog_bar);
}
if (sr->errnum) vrb.error("Error while parsing reference panel: " + std::string(bcf_sr_strerror(sr->errnum)));
free(gt_arr_ref);

// Report
Expand Down
10 changes: 10 additions & 0 deletions phase/src/io/genotype_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,19 @@ class genotype_reader

void initReader(bcf_srs_t * sr, std::string& fmain, std::string& fref, int nthreads);
void initReader(bcf_srs_t * sr, std::string& file, int nthreads);
//Non-fatal open of a single (cloud-streamed) target file. Region/target setup errors
//are deterministic and stay fatal, but a failed open/index load returns the
//synced-reader errnum (nonzero) instead of exiting, so the streaming passes can retry
//a transient open failure rather than aborting the whole run on it.
int openTargetReader(bcf_srs_t * sr, std::string& file, int nthreads);

void scanGenotypes(bcf_srs_t * sr);
void readTarGenotypes(std::string , int);
//One streaming pass each over the target GL file. Return the synced-reader errnum
//(0 on success) so the caller can retry transient cloud-streaming read failures. The
//scan pass also reads the sample names/header (so no separate header open is needed).
int scanTarGenotypes(std::string fmain, int nthreads, std::vector<variant*>& vec_pos_tar);
int parseTarGenotypes(std::string fmain, int nthreads, const std::vector<variant*>& vec_pos_tar);
//void readTarGenotypesValidation(string, string, int);

void parseGenotypes(bcf_srs_t * sr);
Expand Down
Loading