Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ disallow_incomplete_defs = false
check_untyped_defs = true
warn_return_any = false

# The Rust LaSRC package (lasrc-rs / `lasrc`) is a linux-64-only conda package
# absent from the dev/typecheck environment; the orchestration tasks import it
# lazily at runtime only.
[[tool.mypy.overrides]]
module = ["lasrc", "lasrc.*"]
ignore_missing_imports = true

[tool.pixi.workspace]
name = "hls-science-container"
channels = ["conda-forge"]
Expand Down
74 changes: 65 additions & 9 deletions src/hls-nextgen-orchestration/src/hls_nextgen_orchestration/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,50 @@

import click

from hls_nextgen_orchestration.constants import FMASK_VERSION
from hls_nextgen_orchestration.constants import FMASK_VERSION, LASRC_VERSION

if TYPE_CHECKING:
from hls_nextgen_orchestration.pipeline import Pipeline

logger = logging.getLogger(__name__)


def _normalize_fmask(
ctx: click.Context, param: click.Parameter, value: str | None
) -> FMASK_VERSION:
"""Map FMASK_VERSION (``5``/``v5`` → v5, anything else → v4)."""
return "v5" if value in ("5", "v5") else "v4"


def _fmask_option[F: Callable[..., Any]](func: F) -> F:
"""Shared ``--fmask-version`` option (env: FMASK_VERSION, also accepts '5')."""

def _normalize(
ctx: click.Context, param: click.Parameter, value: str | None
) -> FMASK_VERSION:
# FMASK_VERSION 5/v5 -> v5, anything else -> v4
return "v5" if value in ("5", "v5") else "v4"

return click.option(
"--fmask-version",
envvar="FMASK_VERSION",
default="v4",
callback=_normalize_fmask,
callback=_normalize,
help="Fmask version: v4 or v5 (env: FMASK_VERSION, also accepts '5').",
)(func)


def _lasrc_option[F: Callable[..., Any]](func: F) -> F:
"""Shared ``--lasrc-version`` option (env: LASRC_VERSION, also accepts 'rs')."""

def _normalize(
ctx: click.Context, param: click.Parameter, value: str | None
) -> LASRC_VERSION:
# LASRC_VERSION rust/rs -> rust, anything else -> c
return "rust" if value in ("rust", "rs") else "c"

return click.option(
"--lasrc-version",
envvar="LASRC_VERSION",
default="c",
callback=_normalize,
help="LaSRC version: c or rust (env: LASRC_VERSION, also accepts 'rs').",
)(func)


def _run(pipeline: Pipeline, pipeline_name: str) -> None:
"""Run a pipeline under aggregate metrics and exit with its exit code."""
try:
Expand Down Expand Up @@ -149,5 +167,43 @@ def landsat_tile(local_pathrows_dir: Path | None) -> None:
_run(construct_pipeline(local_pathrows_dir=local_pathrows_dir), "landsat-tile")


@cli.command("lasrc")
@click.option(
"--granule",
envvar="GRANULE",
required=True,
help="Sentinel-2 or Landsat granule ID, auto-detected (env: GRANULE).",
)
@click.option(
"--local-granule",
envvar="LOCAL_GRANULE",
default=None,
type=click.Path(path_type=Path),
help="Pre-downloaded granule (.zip for S2, dir for Landsat) (env: LOCAL_GRANULE).",
)
@_lasrc_option
def lasrc(
granule: str,
local_granule: Path | None,
lasrc_version: LASRC_VERSION,
) -> None:
"""Run the standalone LaSRC pipeline (Download -> LaSRC -> Upload).

Temporary pipeline for C vs Rust (lasrc-rs) intercomparison. The Rust path
runs straight off the downloaded scene; the C path runs the ESPA-conversion
chain it requires. Neither path runs Fmask (LaSRC does not consume it).
"""
from hls_nextgen_orchestration.lasrc import construct_pipeline

_run(
construct_pipeline(
granule_id=granule,
lasrc_version=lasrc_version,
local_granule=local_granule,
),
"lasrc",
)


if __name__ == "__main__":
cli()
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""Resolve LaSRC ancillary (aux) file paths for the Rust lasrc port.

The Rust LaSRC (``lasrc``/``lasrc-rs`` package) takes each ancillary input as an
explicit path, unlike the C wrappers which discover them from ``LASRC_AUX_DIR``
internally. This module maps the existing ``LASRC_AUX_DIR`` layout to the seven
paths the Rust ``process_scene`` API (``lasrc.pipeline.AuxFilePaths``) expects.

The keys returned by :func:`resolve_lasrc_aux_paths` deliberately match the
``AuxFilePaths`` field names so a caller can do ``AuxFilePaths(**paths)``.

Expected ``LASRC_AUX_DIR`` layout (see reference docs / known-good
``data/lasrc_aux/``)::

<aux>/
CMGDEM.hdf
ratiomapndwiexp.hdf
LDCMLUT/ # Landsat LUTs (ANGLE_NEW.hdf, RES_LUT/TRANS_LUT/AERO_LUT ...)
MSILUT/ # Sentinel-2 LUTs (same file names, different values)
LADS/<year>/ # daily VIIRS/MODIS aerosol/water-vapor/ozone data

The two ``ANGLE_NEW.hdf`` files (one per LUT dir) are NOT interchangeable, so
Landsat resolves against ``LDCMLUT/`` and Sentinel-2 against ``MSILUT/``.
"""

from __future__ import annotations

import datetime as dt
import os
from pathlib import Path

_LADS_AUX_SOURCES = ("VIIRS", "MODIS")


def _require(path: Path, description: str) -> Path:
"""Return ``path`` if it exists, else raise a descriptive error."""
if not path.exists():
raise FileNotFoundError(
f"Could not find LaSRC aux {description}: {path}. "
"Check LASRC_AUX_DIR and its LDCMLUT/MSILUT/LADS layout."
)
return path


def _glob_one(directory: Path, pattern: str, description: str) -> Path:
"""Return the single file in ``directory`` matching ``pattern``.

Raises if zero or more than one match (ambiguous aux data).
"""
matches = sorted(directory.glob(pattern))
if not matches:
raise FileNotFoundError(
f"Could not find LaSRC aux {description} matching '{pattern}' "
f"in {directory}. Check LASRC_AUX_DIR and its layout."
)
if len(matches) > 1:
raise ValueError(
f"Ambiguous LaSRC aux {description}: multiple files match "
f"'{pattern}' in {directory}: {[m.name for m in matches]}"
)
return matches[0]


def resolve_lasrc_aux_paths(
*,
is_sentinel: bool,
acquisition: dt.datetime,
aux_dir: Path | None = None,
aux_source: str = "VIIRS",
) -> dict[str, Path]:
"""Resolve the seven Rust-LaSRC aux paths from ``LASRC_AUX_DIR``.

Parameters
----------
is_sentinel
Resolve Sentinel-2 LUTs (``MSILUT/``) if True, else Landsat
(``LDCMLUT/``).
acquisition
Scene acquisition datetime, used to select the daily LADS file.
aux_dir
Override the aux root. Defaults to the ``LASRC_AUX_DIR`` environment
variable.
aux_source
Daily aerosol source: "VIIRS" (default) or "MODIS".

Returns
-------
dict[str, Path]
Validated paths keyed by ``AuxFilePaths`` field name (``angle_hdf``,
``intref_hdf``, ``transm_hdf``, ``sphera_hdf``, ``wv_oz_hdf``,
``dem_hdf``, ``ratio_hdf``). Every entry is guaranteed to exist on disk.
"""
if aux_source not in _LADS_AUX_SOURCES:
raise ValueError(
f"Unknown aux_source {aux_source!r} (expected one of {_LADS_AUX_SOURCES})"
)

if aux_dir is None:
env_aux = os.environ.get("LASRC_AUX_DIR")
if not env_aux:
raise RuntimeError(
"LASRC_AUX_DIR is not set; cannot resolve LaSRC aux data paths."
)
aux_dir = Path(env_aux)

aux_dir = _require(aux_dir, "root directory")

lut_dir = _require(
aux_dir / ("MSILUT" if is_sentinel else "LDCMLUT"),
"LUT directory",
)

return {
"angle_hdf": _require(lut_dir / "ANGLE_NEW.hdf", "angle LUT (ANGLE_NEW.hdf)"),
"intref_hdf": _glob_one(lut_dir, "RES_LUT_*.hdf", "intrinsic reflectance LUT"),
"transm_hdf": _glob_one(lut_dir, "TRANS_LUT_*.hdf", "transmission LUT"),
"sphera_hdf": _glob_one(lut_dir, "AERO_LUT_*.hdf", "spherical albedo LUT"),
"wv_oz_hdf": _resolve_lads_file(aux_dir, acquisition, aux_source),
"dem_hdf": _require(aux_dir / "CMGDEM.hdf", "CMG DEM (CMGDEM.hdf)"),
"ratio_hdf": _require(
aux_dir / "ratiomapndwiexp.hdf", "band ratio / NDWI (ratiomapndwiexp.hdf)"
),
}


def _resolve_lads_file(
aux_dir: Path, acquisition: dt.datetime, aux_source: str
) -> Path:
"""Resolve the daily LADS water-vapor/ozone file for the scene date.

VIIRS files are named like ``V*04ANC.A<year><doy>.*.h5``; MODIS files like
``M*<year><doy>*``. Both live under ``LADS/<year>/``.
"""
year = acquisition.strftime("%Y")
doy = acquisition.strftime("%j")
lads_year_dir = _require(aux_dir / "LADS" / year, f"LADS directory for {year}")

if aux_source == "VIIRS":
# e.g. VJ104ANC.A2026073.001.h5 / VNP04ANC.A2026073...
pattern = f"V*04ANC.A{year}{doy}.*"
else: # MODIS, e.g. MOD04... / MYD04...
pattern = f"M*{year}{doy}*"

return _glob_one(
lads_year_dir, pattern, f"{aux_source} daily water-vapor/ozone (DOY {doy})"
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
"v5": "Fmask v5.0.1",
}

# LaSRC implementations,
# - "c": the C espa-surface-reflectance LaSRC (do_lasrc_*.py over ESPA XML)
# - "rust": the lasrc-rs port (runs directly on the raw scene directory)
LASRC_VERSION = Literal["c", "rust"]


@dataclass(frozen=True, kw_only=True)
class HlsVersion:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import shutil
from dataclasses import dataclass, field, replace
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar

import boto3

from hls_nextgen_orchestration.base import (
Asset,
AssetBundle,
Assets,
DataSource,
Task,
TaskFailure,
Expand Down Expand Up @@ -324,7 +325,7 @@ class ConvertScanline(Task):

# Requires "FMASK_BIN" to keep granule dir clean since Fmask
# will have issues if this runs first.
requires = (GRANULE_DIR, FMASK_BIN)
requires: ClassVar[Assets] = (GRANULE_DIR, FMASK_BIN)
provides = (SCANLINE_DONE,)

def __post_init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Standalone "just-LaSRC" pipeline for C vs Rust intercomparison.

This is a temporary, debug-oriented pipeline that runs only the LaSRC step
(Download -> LaSRC -> Upload) so the C LaSRC and the Rust ``lasrc-rs`` port can
be compared on performance and accuracy. It intentionally does NOT run the full
production workflow (the downstream to/from-HDF steps don't support the Rust
output yet). Once the Rust path is validated and wired into the main pipeline,
this module and the ``--lasrc-version`` flag are expected to be removed.
"""

from .workflow import construct_pipeline

__all__ = ["construct_pipeline"]
Loading
Loading