Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ COPY --chown=$MAMBA_USER:$MAMBA_USER docker/specfile.txt /tmp/specfile.txt
# "Operation too slow"), which would otherwise fail the whole build on a single
# flaky transfer. Retry the install a few times before giving up.
RUN for attempt in 1 2 3 4 5; do \
micromamba install --yes --channel conda-forge -n base -f /tmp/specfile.txt && break; \
if [ "$attempt" = "5" ]; then echo "micromamba install failed after $attempt attempts" && exit 1; fi; \
echo "micromamba install attempt $attempt stalled; retrying in 15s..." && sleep 15; \
micromamba install --yes --channel conda-forge -n base -f /tmp/specfile.txt && break; \
if [ "$attempt" = "5" ]; then echo "micromamba install failed after $attempt attempts" && exit 1; fi; \
echo "micromamba install attempt $attempt stalled; retrying in 15s..." && sleep 15; \
done && \
micromamba clean --all --yes

Expand All @@ -82,8 +82,8 @@ ARG MAMBA_DOCKERFILE_ACTIVATE=1
# satisfies spurt's `numpy>=1.23`, so install without the constraint.
RUN pip install git+https://github.com/isce-framework/spurt@v0.1.1
# --no-deps because they are installed with conda
RUN pip install rich && pip install --no-deps git+https://github.com/opera-adt/opera-utils@v0.25.7
RUN pip install tyro && pip install --no-deps git+https://github.com/isce-framework/dolphin@v0.42.6
RUN pip install rich && pip install --no-deps git+https://github.com/opera-adt/opera-utils@v0.25.8
RUN pip install tyro && pip install --no-deps git+https://github.com/isce-framework/dolphin@v0.42.7

COPY --chown=$MAMBA_USER:$MAMBA_USER . .
RUN python -m pip install --no-deps .
Expand Down
153 changes: 140 additions & 13 deletions src/disp_nisar/_remote_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import logging
import re
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Iterable, Sequence
from typing import Iterable, NamedTuple, Sequence

import h5netcdf
import h5py
from opera_utils import is_remote_url
from opera_utils._types import PathOrStr
from tenacity import retry, stop_after_attempt, wait_fixed

from ._streaming import S3Path
Expand Down Expand Up @@ -417,16 +419,7 @@ def parallel_s3_download(
from ._streaming import get_authorized_s3_client

s3_client = get_authorized_s3_client(dataset="nisar")
# for url in s3_urls:
# out = _download_file(
# s3_client,
# url,
# output_dir,
# raw_dir,
# frequencies,
# polarization,
# )
# downloaded_files.append[out]

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {}
for url in s3_urls:
Expand Down Expand Up @@ -495,6 +488,130 @@ def _download_file(
return final


class SubsetResult(NamedTuple):
url: str
output: str
error: str


def download_and_subset_from_private_bucket(
urls: Sequence[str],
raw_dir: PathOrStr,
output_dir: PathOrStr,
frequencies: Sequence[str],
polarization: str,
*,
num_workers: int = 4,
per_file_concurrency: int = 8,
aws_profile: str | None = None,
) -> list:
"""Download each ``s3://bucket/key`` URL, subset it, delete the original.

Each URL is handled as one unit: download -> ``_extract_subset`` -> delete.
A bounded thread pool runs at most ``num_workers`` units at once, so peak local
disk stays ~ ``num_workers x (original + subset)``. Downloads overlap; the h5py
repack is serialised behind a lock because HDF5 is not thread-safe.

``_extract_subset`` must be importable in the enclosing scope.

Returns a list of ``SubsetResult(url, output, error)`` named tuples.
"""
from urllib.parse import urlparse

import boto3
from boto3.s3.transfer import TransferConfig
from botocore.config import Config

log = logging.getLogger(__name__)

urls = list(urls)
if not urls:
log.warning("no urls given")
return []

download_dir = Path(raw_dir)
output_dir = Path(output_dir)
download_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)

# One client, shared across threads. The pool must hold enough connections for
# every worker to run its own multipart download at the same time.
session = boto3.Session(profile_name=aws_profile)
client = session.client(
"s3",
config=Config(
max_pool_connections=num_workers * per_file_concurrency + 4,
retries={"max_attempts": 10, "mode": "adaptive"},
),
)
transfer_config = TransferConfig(max_concurrency=per_file_concurrency)
repack_lock = threading.Lock()

def _split(url: str) -> tuple[str, str]:
p = urlparse(url)
if p.scheme != "s3" or not p.netloc or not p.path.strip("/"):
raise ValueError(f"not an s3://bucket/key url: {url!r}")
return p.netloc, p.path.lstrip("/")

def _one(url: str) -> SubsetResult:
try:
bucket, key = _split(url)
except ValueError as e:
return SubsetResult(url, "None", str(e))

local = download_dir / Path(key).name
dst = output_dir / Path(key).name

# 1) download
try:
if not dst.exists():
client.download_file(bucket, key, str(local), Config=transfer_config)
except Exception as e: # noqa: BLE001
local.unlink(missing_ok=True) # drop any partial file
log.exception("download failed: %s", url)
return SubsetResult(url, "None", f"download failed: {e}")

# 2) subset / repack (serialised: HDF5 is not thread-safe)
try:
with repack_lock:
if local.exists():
_extract_subset(local, dst, frequencies, polarization)
except Exception as e: # noqa: BLE001
dst.unlink(missing_ok=True) # drop partial subset
# Keep the original so the repack can be retried without re-downloading.
log.exception("subset failed (original kept): %s", url)
return SubsetResult(url, "None", f"subset failed: {e}")

# `_extract_subset` no-ops for unrecognised products: only delete the
# original if a subset was actually written.
if not dst.exists():
log.info("no subset produced for %s (unrecognised product?)", url)
return SubsetResult(url, "None", "None")

local.unlink(missing_ok=True)
return SubsetResult(url, dst, "None")

log.info(
"%d urls; starting bounded pipeline (num_workers=%d, per_file_concurrency=%d)",
len(urls),
num_workers,
per_file_concurrency,
)

results: list = []
with ThreadPoolExecutor(max_workers=num_workers) as pool:
futures = {pool.submit(_one, url): url for url in urls}
for i, fut in enumerate(as_completed(futures), start=1):
res = fut.result()
results.append(res)
status = "ok" if res.error == "None" else f"ERROR ({res.error})"
log.info("[%d/%d] %s -> %s", i, len(urls), res.url, status)

n_ok = sum(r.error == "None" for r in results)
log.info("done: %d ok, %d failed", n_ok, len(results) - n_ok)
return [r.output for r in results]


def stage_remote_inputs(
urls: Iterable[str | Path],
scratch_dir: Path,
Expand Down Expand Up @@ -550,6 +667,8 @@ def stage_remote_inputs(

https_urls = [u for u in url_list if u.startswith("https://")]
s3_urls = [u for u in url_list if u.startswith("s3://")]
s3_urls_private = [u for u in s3_urls if "cumulus" not in u]
s3_urls_asf = [u for u in s3_urls if u not in s3_urls_private]

if https_urls:
with ThreadPoolExecutor(max_workers=n_workers) as pool:
Expand All @@ -567,9 +686,17 @@ def stage_remote_inputs(
for fut in as_completed(future_to_idx):
i = future_to_idx[fut]
out_paths[i] = fut.result()
elif s3_urls:
elif s3_urls_private:
out_paths = download_and_subset_from_private_bucket(
urls=s3_urls_private,
raw_dir=raw_dir,
output_dir=scratch_dir,
frequencies=frequencies,
polarization=polarization,
)
else:
out_paths = parallel_s3_download(
s3_urls=s3_urls,
s3_urls=s3_urls_asf,
output_dir=scratch_dir,
raw_dir=raw_dir,
frequencies=frequencies,
Expand Down
Loading
Loading