Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/datachain/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datachain.client import BucketStatus, bucket_status
from datachain.lib.data_model import DataModel, DataType, is_chain_type
from datachain.lib.dc import (
C,
Expand Down Expand Up @@ -53,6 +54,7 @@
"Audio",
"AudioFile",
"AudioFragment",
"BucketStatus",
"C",
"Column",
"ColumnExpr",
Expand All @@ -75,6 +77,7 @@
"VideoFile",
"VideoFragment",
"VideoFrame",
"bucket_status",
"create_project",
"datasets",
"delete_dataset",
Expand Down
14 changes: 14 additions & 0 deletions src/datachain/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datachain.studio import process_pipeline_args

from .commands import (
bucket_status_cmd,
clear_cache,
completion,
du,
Expand Down Expand Up @@ -86,6 +87,7 @@ def handle_command(args, catalog, client_config) -> int:
from datachain.studio import process_auth_cli_args, process_jobs_args

command_handlers = {
"bucket": lambda: handle_bucket_command(args, client_config),
"cp": lambda: handle_cp_command(args, catalog),
"clone": lambda: handle_clone_command(args, catalog),
"dataset": lambda: handle_dataset_command(args, catalog),
Expand Down Expand Up @@ -113,6 +115,18 @@ def handle_command(args, catalog, client_config) -> int:
return 1


def handle_bucket_command(args, client_config) -> int:
if args.bucket_cmd is None:
print(
f"Use 'datachain {args.command} --help' to see available options",
file=sys.stderr,
)
return 1
if args.bucket_cmd == "status":
return bucket_status_cmd(args.uri, client_config=client_config)
raise NotImplementedError(f"Unexpected bucket subcommand: {args.bucket_cmd}")


def handle_cp_command(args, catalog):
catalog.cp(
args.sources,
Expand Down
2 changes: 2 additions & 0 deletions src/datachain/cli/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .bucket import bucket_status_cmd
from .datasets import edit_dataset, list_datasets, list_datasets_local, rm_dataset
from .du import du
from .index import index
Expand All @@ -6,6 +7,7 @@
from .show import show

__all__ = [
"bucket_status_cmd",
"clear_cache",
"completion",
"du",
Expand Down
21 changes: 21 additions & 0 deletions src/datachain/cli/commands/bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import sys

from datachain.client import bucket_status


def bucket_status_cmd(uri: str, client_config: dict | None = None) -> int:
"""Check existence and access of a bucket/container.

Returns 0 if bucket exists, 1 if not found.
Raises on network errors.
"""
status = bucket_status(uri, **(client_config or {}))

if status.exists:
print("Status: exists")
print(f"Access: {status.access}")
else:
print("Status: not found")
if status.error:
print(f"Error: {status.error}", file=sys.stderr)
return 0 if status.exists else 1
29 changes: 29 additions & 0 deletions src/datachain/cli/parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,35 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915
subp.add_parser("internal-run-udf", parents=[parent_parser])
subp.add_parser("internal-run-udf-worker", parents=[parent_parser])

parse_bucket = subp.add_parser(
"bucket",
parents=[parent_parser],
description="Commands for cloud storage buckets.",
formatter_class=CustomHelpFormatter,
)
bucket_subparsers = parse_bucket.add_subparsers(
dest="bucket_cmd",
help="Use `datachain bucket CMD --help` for command-specific help",
)

parse_bucket_status = bucket_subparsers.add_parser(
"status",
parents=[parent_parser],
description=(
"Check existence and access level of a bucket without listing objects."
),
formatter_class=CustomHelpFormatter,
)
add_anon_arg(parse_bucket_status)
parse_bucket_status.add_argument(
"uri",
type=str,
help=(
"Bucket URI to check, e.g. s3://my-bucket/, gs://my-bucket/, "
"az://my-container/. Any path beyond the bucket name is ignored."
),
)

add_completion_parser(subp, [parent_parser])
return parser

Expand Down
29 changes: 27 additions & 2 deletions src/datachain/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
from .fsspec import Client
from .fsspec import BucketStatus, Client

__all__ = ["Client"]

def bucket_status(uri: str, **client_config) -> BucketStatus:
"""Check bucket existence and access level without listing objects.

Args:
uri: Bucket URI, e.g. "s3://my-bucket/", "gs://my-bucket/", "az://my-container/"
**client_config: Storage client configuration (anon, aws_key, etc.)
For Azure, pass ``account_name`` to enable anonymous access detection.
Comment thread
amritghimire marked this conversation as resolved.
Outdated
Without it, Azure container status detection may fail and report the
container as non-existent or access as ``denied``.

Returns:
BucketStatus(exists, access) where access is one of:
'anonymous', 'authenticated', 'denied'
"""
client_cls = Client.get_implementation(uri)
name, path = client_cls.split_url(uri)
if path:
raise ValueError(
f"URI must not contain a path component, got: {uri!r}. "
"Use just the bucket/container URI, e.g. s3://my-bucket/"
Comment thread
shcheklein marked this conversation as resolved.
Outdated
)
return client_cls.bucket_status(name, **client_config)


__all__ = ["BucketStatus", "Client", "bucket_status"]
55 changes: 54 additions & 1 deletion src/datachain/client/azure.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,71 @@
from typing import Any

from adlfs import AzureBlobFileSystem
from fsspec.asyn import get_loop, sync

from datachain.lib.file import File
from datachain.progress import tqdm

from .fsspec import DELIMITER, Client, ResultQueue
from .fsspec import DELIMITER, BucketStatus, Client, ResultQueue


class AzureClient(Client):
FS_CLASS = AzureBlobFileSystem
PREFIX = "az://"
protocol = "az"

@classmethod
def bucket_status(cls, name: str, **kwargs) -> BucketStatus:
# Step 1: Authenticated probe (uses env vars / DefaultAzureCredential).
# Try this first so that authenticated access is preferred over anonymous
# when credentials are available in the environment.
# create_fs() may raise ValueError if account_name/connection_string
# is not available (az:// URIs don't carry the account name).
try:
auth_fs = cls.create_fs(**kwargs)
sync(get_loop(), auth_fs._info, name)
return BucketStatus(exists=True, access="authenticated")
except PermissionError:
return BucketStatus(
exists=True,
access="denied",
error=f"Access denied to Azure container '{name}'"
Comment thread
shcheklein marked this conversation as resolved.
" — check credentials/configuration",
)
except FileNotFoundError:
return BucketStatus(
Comment thread
amritghimire marked this conversation as resolved.
exists=False,
access="denied",
error=f"Azure container '{name}' not found",
)
except ValueError:
pass # No credentials configured; fall through to anonymous probe.

# Step 2: Anonymous probe — only reached when no credentials are configured.
Comment thread
amritghimire marked this conversation as resolved.
Outdated
# Forward connection kwargs (e.g. account_name) but strip credential fields.
az_credential_keys = {
"credential",
"connection_string",
"sas_token",
"client_secret",
"client_id",
"tenant_id",
}
anon_kwargs = {k: v for k, v in kwargs.items() if k not in az_credential_keys}
anon_kwargs["anon"] = True
try:
anon_fs = AzureBlobFileSystem(**anon_kwargs)
sync(get_loop(), anon_fs._info, name)
return BucketStatus(exists=True, access="anonymous")
except PermissionError as e:
return BucketStatus(
exists=True,
access="denied",
error=f"Access denied to Azure container '{name}' — {e}",
)
except (FileNotFoundError, ValueError) as e:
return BucketStatus(exists=False, access="denied", error=str(e))

def info_to_file(self, v: dict[str, Any], path: str) -> File:
version_id = v.get("version_id") if self._is_version_aware() else None
return File(
Expand Down
20 changes: 19 additions & 1 deletion src/datachain/client/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from collections.abc import AsyncIterator, Iterator, Sequence
from datetime import datetime
from shutil import copy2
from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, NamedTuple
from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, Literal, NamedTuple
from urllib.parse import urlparse

from dvc_objects.fs.system import reflink
Expand Down Expand Up @@ -57,6 +57,12 @@ class Bucket(NamedTuple):
created: datetime | None


class BucketStatus(NamedTuple):
exists: bool
access: Literal["anonymous", "authenticated", "denied"]
Comment thread
amritghimire marked this conversation as resolved.
error: str | None = None


class Client(ABC):
MAX_THREADS = multiprocessing.cpu_count()
FS_CLASS: ClassVar[type["AbstractFileSystem"]]
Expand Down Expand Up @@ -177,6 +183,18 @@ def ls_buckets(cls, **kwargs) -> Iterator[Bucket]:
created=entry.get("CreationDate"),
)

@classmethod
def bucket_status(cls, name: str, **kwargs) -> "BucketStatus":
"""Check bucket existence and access level without listing objects.

Returns a BucketStatus with:
- exists: whether the bucket/container exists
- access: 'anonymous' | 'authenticated' | 'denied'

Network errors propagate as exceptions.
"""
raise NotImplementedError(f"bucket_status is not supported for {cls.__name__}")

@classmethod
def is_root_url(cls, url) -> bool:
return url == cls.PREFIX
Expand Down
63 changes: 62 additions & 1 deletion src/datachain/client/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
from fsspec.asyn import get_loop, sync
from fsspec.callbacks import DEFAULT_CALLBACK, Callback
from gcsfs import GCSFileSystem
from gcsfs.retry import HttpError
from google.api_core import exceptions as google_exceptions

from datachain.client.fileslice import FileWrapper
from datachain.lib.file import File
from datachain.progress import tqdm

from .fsspec import DELIMITER, Client, ResultQueue
from .fsspec import DELIMITER, BucketStatus, Client, ResultQueue

# Patch gcsfs for consistency with s3fs
GCSFileSystem.set_session = GCSFileSystem._set_session
Expand All @@ -36,6 +38,65 @@ def create_fs(cls, **kwargs) -> GCSFileSystem:

return cast("GCSFileSystem", super().create_fs(**kwargs))

@classmethod
def bucket_status(cls, name: str, **kwargs) -> BucketStatus: # noqa: PLR0911
anon_only = bool(kwargs.get("anon"))

# Step 1: Try anonymous.
# Use _ls (objects.list API) not _info (buckets.get API): GCS does not
# grant storage.buckets.get anonymously even for public buckets.
anon_kwargs = {k: v for k, v in kwargs.items() if k != "anon"}
anon_kwargs["anon"] = True
anon_fs = cls.create_fs(**anon_kwargs)
try:
sync(get_loop(), anon_fs._ls, name)
return BucketStatus(exists=True, access="anonymous")
except (PermissionError, HttpError) as e:
# HttpError is raised by gcsfs for HTTP-level 401/403 responses;
# treat 404 as not-found, anything else as access-denied.
if isinstance(e, HttpError) and e.code == 404:
return BucketStatus(
exists=False,
access="denied",
error=f"GCS bucket '{name}' not found",
)
if anon_only:
return BucketStatus(
exists=True,
access="denied",
error=f"Access denied to GCS bucket '{name}'"
" — check credentials/permissions",
)
except FileNotFoundError:
return BucketStatus(
exists=False, access="denied", error=f"GCS bucket '{name}' not found"
)

# Step 2: Try with configured credentials
Comment thread
amritghimire marked this conversation as resolved.
Outdated
auth_fs = cls.create_fs(**kwargs)
try:
sync(get_loop(), auth_fs._info, name)
return BucketStatus(exists=True, access="authenticated")
except (PermissionError, HttpError) as e:
if isinstance(e, HttpError) and e.code == 404:
return BucketStatus(
exists=False,
access="denied",
error=f"GCS bucket '{name}' not found",
)
return BucketStatus(
exists=True,
access="denied",
error=f"Access denied to GCS bucket '{name}'"
" — check credentials/permissions",
)
except FileNotFoundError:
return BucketStatus(
exists=False, access="denied", error=f"GCS bucket '{name}' not found"
)
except (google_exceptions.Forbidden, google_exceptions.PermissionDenied) as e:
return BucketStatus(exists=True, access="denied", error=str(e))

def url(
self,
path: str,
Expand Down
Loading
Loading