Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datalad_registry/blueprints/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions datalad_registry/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class RepoUrl(db.Model): # type: ignore
branches = db.Column(JSONB)
tags = db.Column(db.Text)
git_objects_kb = db.Column(db.BigInteger)
has_run_records = db.Column(db.Boolean)

# ==== Fields mainly for operations ====

Expand Down
1 change: 1 addition & 0 deletions datalad_registry/root.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Blueprint for /
"""

import logging

from flask import Blueprint, redirect, url_for
Expand Down
50 changes: 45 additions & 5 deletions datalad_registry/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
get_origin_annex_uuid,
get_origin_branches,
get_wt_annexed_file_info,
has_datalad_run_records,
)

from .utils import allocate_ds_path, update_ds_clone, validate_url_is_processed
Expand All @@ -39,6 +40,25 @@
lgr = get_task_logger(__name__)


def _get_extractors_for_url(url: RepoUrl) -> list[str]:
"""
Get the list of metadata extractors to run for a given RepoUrl

:param url: The RepoUrl object
:return: List of extractor names to run for this URL

Note: This function conditionally includes the runprov extractor if the dataset
has DataLad run records in its history.
"""
extractors = list(current_app.config["DATALAD_REGISTRY_METADATA_EXTRACTORS"])

# Add runprov extractor if the dataset has run records
if url.has_run_records:
extractors.append("runprov")

return extractors


class ExtractMetaStatus(StrEnum):
SUCCEEDED = auto()
ABORTED = auto()
Expand Down Expand Up @@ -109,6 +129,8 @@ def _update_dataset_url_info(dataset_url: RepoUrl, ds: Dataset) -> None:
ds.repo.count_objects["size"] + ds.repo.count_objects["size-pack"]
)

dataset_url.has_run_records = has_datalad_run_records(ds)

dataset_url.last_update_dt = datetime.now(timezone.utc)


Expand All @@ -122,6 +144,8 @@ def _update_dataset_url_info(dataset_url: RepoUrl, ds: Dataset) -> None:
# === DANDI related extractors ===
"dandi": ["dandiset.yaml"],
"dandi:files": [".dandi/assets.json"],
# === Runprov extractor has no required files ===
# The runprov extractor only needs git history with DATALAD RUNCMD markers
}


Expand Down Expand Up @@ -182,6 +206,13 @@ def extract_ds_meta(ds_url_id: StrictInt, extractor: StrictStr) -> ExtractMetaSt
# Validate that the RepoUrl has been processed
validate_url_is_processed(url)

# Special handling for runprov extractor: only run if dataset has run records
if extractor == "runprov" and not url.has_run_records:
lgr.debug(
"Skipping runprov extractor for %s: dataset has no run records", url.url
)
return ExtractMetaStatus.ABORTED

# Absolute path of the dataset clone in cache
cache_path_abs = url.cache_path_abs
assert cache_path_abs is not None
Expand Down Expand Up @@ -231,10 +262,15 @@ def extract_ds_meta(ds_url_id: StrictInt, extractor: StrictStr) -> ExtractMetaSt
purpose=f"{extractor} metadata extraction",
)

# Map shorthand extractor names to their metalad equivalents
metalad_extractor = (
f"metalad_{extractor}" if extractor == "runprov" else extractor
)

results = parse_obj_as(
list[MetaExtractResult],
dl.meta_extract(
extractor,
metalad_extractor,
dataset=ds,
result_renderer="disabled",
on_failure="stop",
Expand All @@ -251,10 +287,16 @@ def extract_ds_meta(ds_url_id: StrictInt, extractor: StrictStr) -> ExtractMetaSt
if res.status == "ok":
# Record the metadata to the database
metadata_record = res.metadata_record
# Normalize the extractor name back from metalad equivalents
normalized_extractor_name = (
"runprov"
if metadata_record.extractor_name == "metalad_runprov"
else metadata_record.extractor_name
)
url_metadata = URLMetadata(
dataset_describe=get_head_describe(ds),
dataset_version=metadata_record.dataset_version,
extractor_name=metadata_record.extractor_name,
extractor_name=normalized_extractor_name,
extractor_version=metadata_record.extractor_version,
extraction_parameter=metadata_record.extraction_parameter,
extracted_metadata=metadata_record.extracted_metadata,
Expand Down Expand Up @@ -612,9 +654,7 @@ def chk_url_to_update(
is_record_updated = True

# Initiate extraction of metadata of the up-to-date dataset
for extractor in current_app.config[
"DATALAD_REGISTRY_METADATA_EXTRACTORS"
]:
for extractor in _get_extractors_for_url(url):
extract_ds_meta.apply_async(
(url.id, extractor), link_error=log_error.s()
)
Expand Down
1 change: 1 addition & 0 deletions datalad_registry/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions datalad_registry/tests/test_blueprints/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

73 changes: 72 additions & 1 deletion datalad_registry/tests/test_tasks/test_extract_ds_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datalad_registry.tasks.utils.builtin_meta_extractors import (
InvalidRequiredFileError,
)
from datalad_registry.utils.datalad_tls import get_head_describe
from datalad_registry.utils.datalad_tls import clone, get_head_describe

from . import TEST_MIN_REPO_COMMIT_HEXSHA, TEST_MIN_REPO_TAG

Expand Down Expand Up @@ -279,3 +279,74 @@ def mock_dlreg_meta_extract(*_args, **_kwargs):
monkeypatch.setattr(tasks, "dlreg_meta_extract", mock_dlreg_meta_extract)

assert extract_ds_meta(repo_url.id, "dandi") is ExtractMetaStatus.ABORTED

def test_runprov_skipped_without_run_records(self, processed_ds_urls, flask_app):
"""
Test that runprov extractor is skipped for datasets without run records
"""
url_id = processed_ds_urls[0]

# Ensure the dataset has no run records
with flask_app.app_context():
url = db.session.execute(
db.select(RepoUrl).where(RepoUrl.id == url_id)
).scalar_one()
url.has_run_records = False
db.session.commit()

# Try to extract runprov metadata
assert extract_ds_meta(url_id, "runprov") is ExtractMetaStatus.ABORTED

def test_runprov_runs_with_run_records(self, flask_app, base_cache_path, tmp_path):
"""
Test that runprov extractor runs for datasets with run records
"""
from datalad import api as dl

# Create a dataset with run records
ds = dl.create(path=tmp_path / "ds_with_run", annex=False)
test_file = ds.pathobj / "input.txt"
test_file.write_text("test content")
ds.save(message="Add input file")

ds.run(cmd="echo 'processed' > output.txt", message="Process input file")

# Clone to cache within app context
with flask_app.app_context():
from datalad_registry.tasks.utils import allocate_ds_path

ds_path_relative = allocate_ds_path()
ds_path_absolute = base_cache_path / ds_path_relative
clone(
source=ds.path,
path=ds_path_absolute,
on_failure="stop",
result_renderer="disabled",
)

# Add to database
url = RepoUrl(
url=f"file://{ds.path}",
processed=True,
cache_path=str(ds_path_relative),
has_run_records=True,
)
db.session.add(url)
db.session.commit()
url_id = url.id

# Extract metadata - this should succeed
result = extract_ds_meta(url_id, "runprov")
assert result is ExtractMetaStatus.SUCCEEDED

# Verify metadata was saved
with flask_app.app_context():
metadata = db.session.execute(
db.select(URLMetadata).filter_by(
url_id=url_id, extractor_name="runprov"
)
).scalar_one()
# Verify the metadata contains runprov-specific fields
assert "extracted_metadata" in vars(metadata)
assert metadata.extractor_version is not None
assert metadata.dataset_version is not None
1 change: 1 addition & 0 deletions datalad_registry/tests/test_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

49 changes: 49 additions & 0 deletions datalad_registry/tests/test_utils/test_datalad_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
get_origin_default_branch,
get_origin_upstream_branch,
get_wt_annexed_file_info,
has_datalad_run_records,
)

_TEST_MIN_DATASET_URL = "https://github.com/datalad/testrepo--minimalds.git"
Expand Down Expand Up @@ -272,3 +273,51 @@ def test_normal_operation(self, ds_name, branch_name, request, tmp_path):
l2_clone.repo.call_git(["push", "-u", "origin", branch_name])

assert get_origin_upstream_branch(l2_clone) == branch_name


class TestHasDataladRunRecords:
def test_dataset_without_run_records(self, two_files_ds_non_annex):
"""
Test that a dataset without run records returns False
"""
assert has_datalad_run_records(two_files_ds_non_annex) is False

def test_dataset_with_run_records(self, tmp_path):
"""
Test that a dataset with run records returns True
"""
# Create a new dataset
from datalad import api as dl

ds = dl.create(path=tmp_path / "ds_with_run", annex=False)

# Create a file for the run command to process
test_file = ds.pathobj / "input.txt"
test_file.write_text("test content")
ds.save(message="Add input file")

# Use datalad run to create a commit with DATALAD RUNCMD marker
ds.run(cmd="echo 'processed' > output.txt", message="Process input file")

# Verify the dataset now has run records
assert has_datalad_run_records(ds) is True

def test_cloned_dataset_with_run_records(self, tmp_path):
"""
Test that a cloned dataset with run records in history returns True
"""
from datalad import api as dl

# Create a dataset with run records
ds_orig = dl.create(path=tmp_path / "ds_orig", annex=False)
test_file = ds_orig.pathobj / "input.txt"
test_file.write_text("test content")
ds_orig.save(message="Add input file")

ds_orig.run(cmd="echo 'processed' > output.txt", message="Process input file")

# Clone the dataset
ds_clone = clone(source=ds_orig.path, path=tmp_path / "ds_clone")

# Verify the cloned dataset has run records
assert has_datalad_run_records(ds_clone) is True
38 changes: 38 additions & 0 deletions datalad_registry/utils/datalad_tls.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from dataclasses import dataclass
import logging
import re
from typing import Optional
from uuid import UUID

from datalad import api as dl
from datalad.api import Dataset

lgr = logging.getLogger(__name__)


@dataclass
class WtAnnexedFileInfo:
Expand Down Expand Up @@ -177,3 +180,38 @@ def get_origin_upstream_branch(ds: Dataset) -> str:
)

return match.group(1)


def has_datalad_run_records(ds: Dataset) -> bool:
"""
Check if a dataset has any DataLad run command records in its git history

:param ds: The given dataset
:return: True if the dataset has at least one commit with "DATALAD RUNCMD"
in the commit message, False otherwise

Note: This function searches the git history for the marker string that DataLad
uses to mark commits made by `datalad run` command. It searches all branches
to ensure run records are detected even if they're not on the default branch.
"""
try:
# Search git log for "DATALAD RUNCMD" marker in commit messages
# Using --all to search all branches (run records might exist on any branch)
# and --grep for pattern matching
result = ds.repo.call_git(
[
"log",
"--all",
"--grep=DATALAD RUNCMD",
"--format=%H",
"-n",
"1", # Only need to find one to know they exist
]
)
# If result is non-empty, we found at least one run record
return bool(result.strip())
except Exception as e:
lgr.warning(
"Failed to check for DataLad run records in dataset at %s: %s", ds.path, e
)
return False
27 changes: 27 additions & 0 deletions migrations/versions/f1a2b3c4d5e6_add_has_run_records_to_repourl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Add has_run_records to RepoUrl

Revision ID: f1a2b3c4d5e6
Revises: e35239d06d44
Create Date: 2026-02-06 16:38:15.749000

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "f1a2b3c4d5e6"
down_revision = "e35239d06d44"
branch_labels = None
depends_on = None


def upgrade():
with op.batch_alter_table("repo_url", schema=None) as batch_op:
batch_op.add_column(
sa.Column("has_run_records", sa.Boolean(), nullable=True)
)


def downgrade():
with op.batch_alter_table("repo_url", schema=None) as batch_op:
batch_op.drop_column("has_run_records")