Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/datachain/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datachain.client import BucketStatus, bucket_status
from datachain.lib.data_model import DataModel, DataType, is_chain_type
from datachain.lib.dc import (
C,
Expand Down Expand Up @@ -53,6 +54,7 @@
"Audio",
"AudioFile",
"AudioFragment",
"BucketStatus",
"C",
"Column",
"ColumnExpr",
Expand All @@ -75,6 +77,7 @@
"VideoFile",
"VideoFragment",
"VideoFrame",
"bucket_status",
"create_project",
"datasets",
"delete_dataset",
Expand Down
17 changes: 17 additions & 0 deletions src/datachain/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datachain.studio import process_pipeline_args

from .commands import (
bucket_status_cmd,
clear_cache,
completion,
du,
Expand Down Expand Up @@ -89,6 +90,7 @@ def handle_command(args, catalog, client_config) -> int:
from datachain.studio import process_auth_cli_args, process_jobs_args

command_handlers = {
"bucket": lambda: handle_bucket_command(args, client_config),
"cp": lambda: handle_cp_command(args, catalog),
"clone": lambda: handle_clone_command(args, catalog),
"dataset": lambda: handle_dataset_command(args, catalog),
Expand Down Expand Up @@ -117,6 +119,21 @@ def handle_command(args, catalog, client_config) -> int:
return 1


def handle_bucket_command(args, client_config) -> int:
if args.bucket_cmd is None:
print(
f"Use 'datachain {args.command} --help' to see available options",
file=sys.stderr,
)
return 1
if args.bucket_cmd == "status":
cfg = dict(client_config)
if getattr(args, "account_name", None):
cfg["account_name"] = args.account_name
return bucket_status_cmd(args.uri, client_config=cfg)
raise NotImplementedError(f"Unexpected bucket subcommand: {args.bucket_cmd}")


def handle_cp_command(args, catalog):
catalog.cp(
args.sources,
Expand Down
2 changes: 2 additions & 0 deletions src/datachain/cli/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .bucket import bucket_status_cmd
from .datasets import edit_dataset, list_datasets, list_datasets_local, rm_dataset
from .du import du
from .index import index
Expand All @@ -7,6 +8,7 @@
from .skill import install_skills, list_skills, uninstall_skills

__all__ = [
"bucket_status_cmd",
"clear_cache",
"completion",
"du",
Expand Down
21 changes: 21 additions & 0 deletions src/datachain/cli/commands/bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import sys

from datachain.client import bucket_status


def bucket_status_cmd(uri: str, client_config: dict | None = None) -> int:
"""Check existence and access of a bucket/container.

Returns 0 if bucket exists, 1 if not found.
Raises on network errors.
"""
status = bucket_status(uri, **(client_config or {}))

if status.exists:
print("Status: exists")
print(f"Access: {status.access}")
else:
print("Status: not found")
if status.error:
print(f"Error: {status.error}", file=sys.stderr)
return 0 if status.exists else 1
35 changes: 35 additions & 0 deletions src/datachain/cli/parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,41 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915
subp.add_parser("internal-run-udf", parents=[parent_parser])
subp.add_parser("internal-run-udf-worker", parents=[parent_parser])

parse_bucket = subp.add_parser(
"bucket",
parents=[parent_parser],
description="Commands for cloud storage buckets.",
formatter_class=CustomHelpFormatter,
)
bucket_subparsers = parse_bucket.add_subparsers(
dest="bucket_cmd",
help="Use `datachain bucket CMD --help` for command-specific help",
)

parse_bucket_status = bucket_subparsers.add_parser(
"status",
parents=[parent_parser],
description=(
"Check existence and access level of a bucket without listing objects."
),
formatter_class=CustomHelpFormatter,
)
parse_bucket_status.add_argument(
"uri",
type=str,
help=(
"Bucket URI to check, e.g. s3://my-bucket/, gs://my-bucket/, "
"az://my-container/."
),
)
parse_bucket_status.add_argument(
"--account-name",
dest="account_name",
type=str,
default=None,
help="Azure storage account name (required for anonymous access detection).",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it required for non anon access check also? in some cases I think key doesn't include account name

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave as it is. In case key doesn't include account name, it raises error saying the same.

)

add_completion_parser(subp, [parent_parser])
return parser

Expand Down
25 changes: 23 additions & 2 deletions src/datachain/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
from .fsspec import Client
from .fsspec import BucketStatus, Client

__all__ = ["Client"]

def bucket_status(uri: str, **client_config) -> BucketStatus:
"""Check bucket existence and access level without listing objects.

Args:
uri: Bucket URI, e.g. "s3://my-bucket/", "gs://my-bucket/", "az://my-container/"
**client_config: Storage client configuration (aws_key, etc.)
For Azure, pass ``account_name`` to enable anonymous access detection;
without it, only authenticated access is probed.

Returns:
BucketStatus(exists, access) where access is one of:
'anonymous', 'authenticated', 'denied'
"""
client_cls = Client.get_implementation(uri)
name, path = client_cls.split_url(uri)
if path:
raise ValueError(f"path in a bucket is not allowed, only bucket name: {uri!r}")
return client_cls.bucket_status(name, **client_config)


__all__ = ["BucketStatus", "Client", "bucket_status"]
54 changes: 53 additions & 1 deletion src/datachain/client/azure.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,70 @@
from typing import Any

from adlfs import AzureBlobFileSystem
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceNotFoundError,
)
from azure.storage.blob import BlobServiceClient
from fsspec.asyn import get_loop, sync

from datachain.lib.file import File
from datachain.progress import tqdm

from .fsspec import DELIMITER, Client, ResultQueue
from .fsspec import DELIMITER, BucketStatus, Client, ResultQueue


class AzureClient(Client):
FS_CLASS = AzureBlobFileSystem
PREFIX = "az://"
protocol = "az"

@classmethod
def bucket_status(cls, name: str, **kwargs) -> BucketStatus:
# Step 1: Anonymous probe — uses BlobServiceClient directly (not adlfs)
# to avoid picking up credentials from environment variables like
# AZURE_STORAGE_CONNECTION_STRING.
account_name = kwargs.get("account_name")
if account_name:
try:
url = f"https://{account_name}.blob.core.windows.net"
anon_client = BlobServiceClient(account_url=url)
Comment thread
amritghimire marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check - how we build URL - is it only bucket name?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if not we raise error as requested by dmitry in one of the comment.

anon_client.get_container_client(name).get_container_properties()
return BucketStatus(exists=True, access="anonymous")
except ClientAuthenticationError:
pass
except ResourceNotFoundError:
return BucketStatus(
exists=False,
access="denied",
error=f"Azure container '{name}' not found",
)
except HttpResponseError as e:
if e.status_code not in (401, 403):
raise

# Step 2: Authenticated probe.
try:
auth_fs = cls.create_fs(**kwargs)
sync(get_loop(), auth_fs._info, name)
return BucketStatus(exists=True, access="authenticated")
except (PermissionError, ClientAuthenticationError):
return BucketStatus(
exists=True,
access="denied",
error=f"Access denied to Azure container '{name}'"
Comment thread
shcheklein marked this conversation as resolved.
" — check credentials/configuration",
)
except FileNotFoundError:
return BucketStatus(
Comment thread
amritghimire marked this conversation as resolved.
exists=False,
access="denied",
error=f"Azure container '{name}' not found",
)
except ValueError as e:
return BucketStatus(exists=False, access="denied", error=str(e))

def info_to_file(self, v: dict[str, Any], path: str) -> File:
version_id = v.get("version_id") if self._is_version_aware() else None
return File(
Expand Down
20 changes: 19 additions & 1 deletion src/datachain/client/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from collections.abc import AsyncIterator, Iterator, Sequence
from datetime import datetime
from shutil import copy2
from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, NamedTuple
from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, Literal, NamedTuple
from urllib.parse import urlparse

from dvc_objects.fs.system import reflink
Expand Down Expand Up @@ -57,6 +57,12 @@ class Bucket(NamedTuple):
created: datetime | None


class BucketStatus(NamedTuple):
exists: bool
access: Literal["anonymous", "authenticated", "denied"]
Comment thread
amritghimire marked this conversation as resolved.
error: str | None = None


class Client(ABC):
MAX_THREADS = multiprocessing.cpu_count()
FS_CLASS: ClassVar[type["AbstractFileSystem"]]
Expand Down Expand Up @@ -177,6 +183,18 @@ def ls_buckets(cls, **kwargs) -> Iterator[Bucket]:
created=entry.get("CreationDate"),
)

@classmethod
def bucket_status(cls, name: str, **kwargs) -> "BucketStatus":
"""Check bucket existence and access level without listing objects.

Returns a BucketStatus with:
- exists: whether the bucket/container exists
- access: 'anonymous' | 'authenticated' | 'denied'

Network errors propagate as exceptions.
"""
raise NotImplementedError(f"bucket_status is not supported for {cls.__name__}")

@classmethod
def is_root_url(cls, url) -> bool:
return url == cls.PREFIX
Expand Down
54 changes: 53 additions & 1 deletion src/datachain/client/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
from fsspec.asyn import get_loop, sync
from fsspec.callbacks import DEFAULT_CALLBACK, Callback
from gcsfs import GCSFileSystem
from gcsfs.retry import HttpError

from datachain.client.fileslice import FileWrapper
from datachain.lib.file import File
from datachain.progress import tqdm

from .fsspec import DELIMITER, Client, ResultQueue
from .fsspec import DELIMITER, BucketStatus, Client, ResultQueue

# Patch gcsfs for consistency with s3fs
GCSFileSystem.set_session = GCSFileSystem._set_session
Expand All @@ -38,6 +39,57 @@ def create_fs(cls, **kwargs) -> GCSFileSystem:

return cast("GCSFileSystem", super().create_fs(**kwargs))

@classmethod
def bucket_status(cls, name: str, **kwargs) -> BucketStatus: # noqa: PLR0911
from google.api_core import exceptions as google_exceptions

# Step 1: Anonymous probe.
# Use _ls (objects.list API) not _info (buckets.get API): GCS does not
# grant storage.buckets.get anonymously even for public buckets.
anon_kwargs = {k: v for k, v in kwargs.items() if k != "anon"}
anon_kwargs["anon"] = True
anon_fs = cls.create_fs(**anon_kwargs)
try:
sync(get_loop(), anon_fs._ls, name)
return BucketStatus(exists=True, access="anonymous")
except FileNotFoundError:
return BucketStatus(
exists=False, access="denied", error=f"GCS bucket '{name}' not found"
)
except (PermissionError, HttpError, OSError) as e:
if isinstance(e, HttpError) and e.code == 404:
return BucketStatus(
exists=False,
access="denied",
error=f"GCS bucket '{name}' not found",
)

# Step 2: Authenticated probe — create_fs resolves credentials from
# kwargs, environment, or application-default credentials.
auth_fs = cls.create_fs(**kwargs)
try:
sync(get_loop(), auth_fs._info, name)
return BucketStatus(exists=True, access="authenticated")
except FileNotFoundError:
return BucketStatus(
exists=False, access="denied", error=f"GCS bucket '{name}' not found"
)
except (google_exceptions.Forbidden, google_exceptions.PermissionDenied) as e:
return BucketStatus(exists=True, access="denied", error=str(e))
except (PermissionError, HttpError, OSError) as e:
if isinstance(e, HttpError) and e.code == 404:
return BucketStatus(
exists=False,
access="denied",
error=f"GCS bucket '{name}' not found",
)
return BucketStatus(
exists=True,
access="denied",
error=f"Access denied to GCS bucket '{name}'"
" — check credentials/permissions",
)

def url(
self,
path: str,
Expand Down
Loading
Loading