Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pipeline_steps/common_extraction_functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ change_column_names <- function(gwas, columns = list(), remove_extra_columns = F
if (name %in% names(gwas) && already) {
gwas <- gwas[, -which(names(gwas) %in% c(name))]
}
names(gwas)[names(gwas) == columns[name]] <- name
names(gwas)[names(gwas) == columns[[name]]] <- name
}

if (remove_extra_columns) {
Expand Down
2 changes: 1 addition & 1 deletion tests/testing_complete.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SUCCESS: All tests passed on branch: feature/dont-fail-bad-rows
SUCCESS: All tests passed on branch: bug/mismatch-headers
56 changes: 35 additions & 21 deletions worker/pipeline_worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -176,33 +176,44 @@ process_message <- function(original_gwas_info, original_payload = NULL) {
verification_result <- verify_gwas_data(gwas_info, gwas)
if (!verification_result$valid) {
flog.error(paste(gwas_info$metadata$guid, verification_result$error))
send_update_gwas_upload(gwas_info, FALSE, paste("Caught error: ", verification_result$error))
return()
err <- paste("Caught error: ", verification_result$error)
send_update_gwas_upload(gwas_info, FALSE, err)
return(err)
}
gwas <- verification_result$gwas
gwas_original_columns <- verification_result$gwas_original_columns
if (!is.null(verification_result$n_removed) && verification_result$n_removed > 0) {
flog.info(paste(
gwas_info$metadata$guid,
verification_result$removed_message
))
}

updated_gwas <- change_column_names(gwas, gwas_info$metadata$column_names)
if (!"EAF" %in% colnames(updated_gwas)) {
gwas$EAF <- NA
write_path <- gwas_summary_writable_path(gwas_info$metadata$file_location)
vroom::vroom_write(gwas, write_path)
if (write_path != gwas_info$metadata$file_location) {
gwas_info$metadata$file_location <- write_path
gwas_info$file_location <- write_path
create_study_metadata_files(gwas_info)
flog.info(paste(
gwas_info$metadata$guid,
"Wrote expanded summary stats to TSV (archives are read-only for vroom_write):",
write_path
))
}
flog.info(paste(gwas_info$metadata$guid, "Added EAF column (NA) for LD panel fill-in during standardisation"))
if (!"EAF" %in% colnames(gwas)) {
gwas_original_columns$EAF <- NA
flog.info(paste(
gwas_info$metadata$guid,
"Added EAF column (NA) for LD panel fill-in during standardisation"
))
}

write_path <- gwas_summary_writable_path(gwas_info$metadata$file_location)
vroom::vroom_write(gwas_original_columns, write_path)
if (write_path != gwas_info$metadata$file_location) {
gwas_info$metadata$file_location <- write_path
gwas_info$file_location <- write_path
create_study_metadata_files(gwas_info)
flog.info(paste(
gwas_info$metadata$guid,
"Wrote summary stats for extraction (original column names; .zip unpacked to TSV):",
write_path
))
} else {
flog.info(paste(
gwas_info$metadata$guid,
"Wrote filtered summary stats for extraction (original column names):",
write_path
))
}

flog.info(paste(gwas_info$metadata$guid, "Extracting regions"))
Expand Down Expand Up @@ -232,7 +243,7 @@ process_message <- function(original_gwas_info, original_payload = NULL) {
check_pipeline_step_complete(ld_blocks_to_colocalise_file, "updated_ld_blocks_to_colocalise.tsv", output)

# cleaning up memory, so it utilises less in the parallel processes
rm(gwas, updated_gwas, gwas_data)
rm(gwas, gwas_original_columns, gwas_data)
gc()

ld_blocks_to_colocalise <- vroom::vroom(ld_blocks_to_colocalise_file, show_col_types = F)
Expand Down Expand Up @@ -299,7 +310,6 @@ process_message <- function(original_gwas_info, original_payload = NULL) {
flog.error(paste(guid, "Error call:", deparse(e$call)))
}

# Create error details
message_and_error <- list(
original_message = original_payload,
error = error_msg,
Expand All @@ -322,6 +332,7 @@ verify_gwas_data <- function(gwas_info, gwas) {
)
}, gwas_info$metadata$column_names)

gwas_original_columns <- gwas
gwas <- change_column_names(gwas, gwas_info$metadata$column_names)
mandatory_columns <- c("CHR", "BP", "P", "EA", "OA")

Expand Down Expand Up @@ -365,6 +376,7 @@ verify_gwas_data <- function(gwas_info, gwas) {
}

gwas <- gwas[!drop, , drop = FALSE]
gwas_original_columns <- gwas_original_columns[!drop, , drop = FALSE]
n_removed <- n_before - nrow(gwas)

if (nrow(gwas) == 0) {
Expand All @@ -383,7 +395,8 @@ verify_gwas_data <- function(gwas_info, gwas) {
" SNP rows were removed due to missing or invalid data (",
err_detail, ")."
),
gwas = gwas
gwas = gwas,
gwas_original_columns = gwas_original_columns
))
}

Expand All @@ -396,6 +409,7 @@ verify_gwas_data <- function(gwas_info, gwas) {
valid = TRUE,
error = NULL,
gwas = gwas,
gwas_original_columns = gwas_original_columns,
n_removed = n_removed,
removed_message = removed_msg
))
Expand Down
Loading