Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
* Moved db connection setup and teardown from test setup to a `npgmlwarehouse.db.utils` to ease
reuse
* Updated ORM mappings
* Upsert functionality of iRODS collections into iRODS location table `seq_product_irods_locations`

### Fixed

Expand Down
69 changes: 46 additions & 23 deletions src/npgmlwarehouse/db/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,44 +49,67 @@ def get_ultimagen_target_product_records(session: Session, id_run: int):

def create_upload_irods_location_records(
session: Session,
product_collection: dict[str, str],
platform_name: str,
product_data: dict[str, dict],
seq_platform_name: str,
pipeline_name: str,
):
"""
Insert product records into the iRODS location table
(`seq_product_irods_locations`) identified by their product IDs.
In the case of a duplicate entry in the database which corresponds
to a duplicate unique key of (`id_product`,`irods_root_collection`),
the insertion is ignored and the function will continue normally
with no exception.
Insert product records identified by their product IDs into the iRODS location
table `seq_product_irods_locations`. In the case a record with this `id_product` and
`irods_root_collection` exists in the database no error will be raised.
Records are updated accordingly.

`product_data` should have the following structure where the last two keys are
optional. When not specified, the update to NULL will be issued:

product_data[`id_product`] = {
"irods_root_collection": "/irods/path/to/collection1",
"irods_data_relative_path": "/irods/path/to/data1",
"irods_secondary_data_relative_path": "/irods/path/to/secondary/data1",
}

Args:
session (Session):
Database connection Session
product_collection (dict[str,str]):
Dictionary of (sequencing product ID), (iRODS collection path)
platform_name (str):
Name of the platform
product_data (dict[str, dict]):
Dictionary composed in the following way:
key: sequencing product ID (str)
value: Dictionary of Column (str), iRODS path (str).
Column names:
`irods_root_collection`, `irods_data_relative_path`, `irods_secondary_data_relative_path`
seq_platform_name (str):
Platform name common to all records
pipeline_name (str):
Name of the pipeline
Pipeline name common to all records

Returns:
None
"""
if not product_collection:
if not product_data:
return

to_insert = [
{
to_insert = []
for id_product, data in product_data.items():
data_to_insert = {
"id_product": id_product,
"seq_platform_name": platform_name,
"seq_platform_name": seq_platform_name,
"pipeline_name": pipeline_name,
"irods_root_collection": coll,
"irods_root_collection": data["irods_root_collection"],
"irods_data_relative_path": data.get("irods_data_relative_path", None),
"irods_secondary_data_relative_path": data.get(
"irods_secondary_data_relative_path", None
),
}
for id_product, coll in product_collection.items()
]
session.execute(
insert(SeqProductIrodsLocations).values(to_insert).prefix_with("IGNORE")
)
to_insert.append(data_to_insert)

insert_query = insert(SeqProductIrodsLocations).values(to_insert)
on_duplicate_kwargs = {
"seq_platform_name": insert_query.inserted.seq_platform_name,
"pipeline_name": insert_query.inserted.pipeline_name,
"irods_data_relative_path": insert_query.inserted.irods_data_relative_path,
"irods_secondary_data_relative_path": insert_query.inserted.irods_secondary_data_relative_path,
}

insert_on_duplicate = insert_query.on_duplicate_key_update(**on_duplicate_kwargs)
session.execute(insert_on_duplicate)
session.commit()
Loading