Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
aa9a410
Add Databricks ingestion script and requirements file
awaismirza92 Dec 2, 2025
f329566
Make the injection function callable
awaismirza92 Dec 2, 2025
7c91e7b
Mention import of injection function
awaismirza92 Dec 2, 2025
6464559
Migrate to pyproject.toml & uv
awaismirza92 Dec 3, 2025
1eb9a88
Replace relative import with absolute import in Databricks ingestion
awaismirza92 Dec 9, 2025
dc86916
Remove pandas by streaming files to volume
awaismirza92 Dec 9, 2025
1011f94
Document DEFAULT profile, remove python & preparation
awaismirza92 Dec 10, 2025
0a780e3
Add parameter markers and pydantic validation to avoid SQL injection
awaismirza92 Dec 10, 2025
1fcbd4f
Rename variable name for clarity
awaismirza92 Dec 10, 2025
f86c976
Fix bug in string wrapping
awaismirza92 Dec 11, 2025
b3af7fc
Suppress INFO/WARNING from absl/glog
awaismirza92 Dec 12, 2025
375adfd
Remove the mutation of `os.environ`
awaismirza92 Dec 15, 2025
0a19854
Use module level constant for request timeout & increase it to 300s
awaismirza92 Dec 15, 2025
c006495
Add debug logging for successful volume file cleanup
awaismirza92 Dec 15, 2025
c4a0955
Move logging suppression to a dedicated function
awaismirza92 Dec 15, 2025
e009302
Remove ignore comment for model_config in TableConfig
awaismirza92 Dec 15, 2025
962be5b
Remove trailing blank lines at end of pyproject.toml
awaismirza92 Dec 15, 2025
e46fe23
Close spark session if the function creates it
awaismirza92 Dec 15, 2025
da01415
Use exact matches for databricks group in pyproject.toml
awaismirza92 Dec 15, 2025
447d527
Remove comments leaking LLM focus from ingestion module
awaismirza92 Dec 15, 2025
00f30c5
Use exact version specifications for databricks dependencies in uv.lock
awaismirza92 Dec 15, 2025
bcbc393
Replace SQL identifier validation with quoting using sqlglot
awaismirza92 Dec 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Integration modules for connecting getML with external platforms."""
120 changes: 120 additions & 0 deletions integration/databricks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Databricks Data Integration

This directory contains modules for ingesting data from GCS into Databricks Delta Lake and preparing population tables for getML feature engineering.


## Prerequisites

- **Python 3.12**
- **Databricks Free Edition account** (or higher tier)
- **Databricks CLI** installed

## Setup

### 1. Install Databricks CLI

```bash
# macOS
brew install databricks/tap/databricks

# Linux & macOS & Windows
curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh
```

More: https://docs.databricks.com/gcp/en/dev-tools/cli/install

### 2. Create Python Virtual Environment

```bash
# Navigate to this directory
cd integration/databricks

# Create virtual environment with Python 3.12
python3.12 -m venv .venv

# Activate it
source .venv/bin/activate # macOS/Linux
# .venv\Scripts\activate # Windows

# Install dependencies
pip install -r requirements.txt
```

### 3. Authenticate with Databricks

```bash
# Get your workspace URL from your Databricks Free Edition account
# It looks like: https://<workspace-id>.cloud.databricks.com

databricks auth login --host https://<your-workspace>.cloud.databricks.com
```

This will open a browser for OAuth authentication. After successful login, your credentials are cached locally.

### 4. Verify Authentication

```bash
databricks auth profiles
```

You should see your workspace listed.

## Usage

### Python API (Recommended)

Use the modules directly in notebooks or scripts:

Comment thread
awaismirza92 marked this conversation as resolved.
```python
from integration.databricks.data import ingestion, preparation

# Load raw data from GCS to Databricks
loaded_tables = ingestion.load_from_gcs(
bucket="https://static.getml.com/datasets/jaffle_shop/",
destination_schema="jaffle_shop"
)
print(f"Loaded {len(loaded_tables)} tables")
```

### Load Specific Tables

```python
from integration.databricks.data import ingestion

# Load only the tables you need
ingestion.load_from_gcs(
destination_schema="RAW",
tables=["raw_customers", "raw_orders", "raw_items", "raw_products"]
)
```

## Troubleshooting

### Authentication Errors

```bash
# Re-authenticate
databricks auth login --host https://<your-workspace>.cloud.databricks.com

# Check your profile
databricks auth env
```

### Python Version Issues

Databricks serverless requires Python 3.12:

Comment thread
awaismirza92 marked this conversation as resolved.
Outdated
```bash
python --version # Should show 3.12.x

# If not, install Python 3.12 and recreate venv
brew install python@3.12 # macOS
```

### Connection Timeout

Free Edition has limited compute resources. If you see timeouts:
- Wait a few minutes and retry (serverless cold start can take few seconds or minutes)
- Check your quota in the Databricks workspace


1 change: 1 addition & 0 deletions integration/databricks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Databricks integration for getML demos."""
Empty file.
254 changes: 254 additions & 0 deletions integration/databricks/data/ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
"""
GCS to Databricks Delta Lake ingestion module.

This module provides functions to load parquet files from GCS
and write them as Delta tables in Databricks.

Example:
# Assuming running from root of repository
from integration.databricks.data import ingestion

# Load all jaffle_shop tables
ingestion.load_from_gcs(
bucket="https://static.getml.com/datasets/jaffle_shop/",
destination_schema="jaffle_shop"
)

# Or load specific tables
ingestion.load_from_gcs(
bucket="https://static.getml.com/datasets/jaffle_shop/",
destination_schema="jaffle_shop",
tables=["raw_customers", "raw_orders"]
)
"""

from __future__ import annotations

import io
import logging
import os
import tempfile
from collections.abc import Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Final

import pandas as pd
import requests
from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession

logger = logging.getLogger(__name__)

# Default configuration
DEFAULT_BUCKET: Final[str] = "https://static.getml.com/datasets/jaffle_shop"
DEFAULT_CATALOG: Final[str] = "workspace"
DEFAULT_SCHEMA: Final[str] = "jaffle_shop"
DEFAULT_PROFILE: Final[str] = "Code17"
Comment thread
awaismirza92 marked this conversation as resolved.
Outdated

JAFFLE_SHOP_TABLES: Final[tuple[str, ...]] = (
"raw_customers",
"raw_items",
"raw_orders",
"raw_products",
"raw_stores",
"raw_supplies",
"raw_tweets",
)


@dataclass(frozen=True)
class TableConfig:
"""Configuration for a Delta table to be created."""

source_url: str
table_name: str
catalog: str
schema: str

@property
Comment thread
awaismirza92 marked this conversation as resolved.
def full_table_name(self) -> str:
"""Return fully qualified table name."""
return f"{self.catalog}.{self.schema}.{self.table_name}"


def _download_parquet(url: str) -> pd.DataFrame:
"""
Comment thread
awaismirza92 marked this conversation as resolved.
Outdated
Download a parquet file from URL and return as pandas DataFrame.

Falls back to saving to temp file if direct bytes reading fails.
"""
logger.info(f"Downloading: {url}")
response = requests.get(url, timeout=120)
response.raise_for_status()

try:
return pd.read_parquet(io.BytesIO(response.content)) # pyright: ignore[reportUnknownMemberType]
except Exception:
logger.warning("Direct read failed, using temp file fallback")
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp:
_ = tmp.write(response.content)
tmp_path = tmp.name
try:
return pd.read_parquet(tmp_path) # pyright: ignore[reportUnknownMemberType]
finally:
Path(tmp_path).unlink(missing_ok=True)


def _create_spark_session(profile: str | None = None) -> SparkSession:
"""
Create a Databricks Spark session using serverless compute.

Uses Databricks CLI authentication. If profile is not specified,
uses DATABRICKS_CONFIG_PROFILE env var or defaults to 'DEFAULT'.

Args:
profile: Databricks CLI profile name (optional).

Returns:
SparkSession connected to Databricks serverless compute.

Raises:
Comment thread
awaismirza92 marked this conversation as resolved.
Comment thread
awaismirza92 marked this conversation as resolved.
RuntimeError: If connection fails.
"""
logger.info("Creating Databricks Spark session (serverless)...")
try:
profile_name = profile or os.environ.get(
"DATABRICKS_CONFIG_PROFILE", DEFAULT_PROFILE
)

if profile_name:
logger.info(f"Using Databricks profile: {profile_name}")
os.environ["DATABRICKS_CONFIG_PROFILE"] = profile_name

spark = DatabricksSession.builder.serverless().getOrCreate()

logger.info("Successfully connected to Databricks serverless compute")
return spark
except Exception as e:
logger.error(f"Failed to create Databricks session: {e}")
logger.error(
"Make sure you've authenticated with: databricks auth login --host <workspace-url>"
)
logger.error(
"If using a named profile, set DATABRICKS_CONFIG_PROFILE=<profile-name>"
)
raise RuntimeError(f"Failed to connect to Databricks: {e}") from e


def _write_to_delta(
spark: SparkSession,
pdf: pd.DataFrame,
config: TableConfig,
) -> None:
"""Write a pandas DataFrame to Delta Lake as a managed table."""
logger.info(f"Writing to Delta table: {config.full_table_name}")

sdf: DataFrame = spark.createDataFrame(pdf) # pyright: ignore[reportUnknownMemberType]

sdf.write.format("delta").mode("overwrite").option(
"overwriteSchema", "true"
).saveAsTable(config.full_table_name)

Comment thread
awaismirza92 marked this conversation as resolved.
logger.info(f"Successfully wrote {len(pdf)} rows to {config.full_table_name}")


def _ensure_schema_exists(spark: SparkSession, catalog: str, schema: str) -> None:
"""Ensure the target schema exists, create if necessary."""
logger.info(f"Ensuring schema exists: {catalog}.{schema}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}") # pyright: ignore[reportUnknownMemberType]


def load_from_gcs(
bucket: str = DEFAULT_BUCKET,
destination_schema: str = DEFAULT_SCHEMA,
destination_catalog: str = DEFAULT_CATALOG,
tables: Sequence[str] | None = None,
spark: SparkSession | None = None,
profile: str | None = None,
) -> list[str]:
"""
Load parquet files from GCS bucket to Databricks Delta Lake.

Downloads parquet files from the specified GCS bucket and writes them
as Delta tables in Databricks.

Args:
bucket: GCS bucket URL (default: jaffle_shop dataset).
destination_schema: Target schema name in Databricks.
destination_catalog: Target catalog name in Databricks.
tables: List of table names to load. If None, loads all jaffle_shop tables.
spark: Optional existing SparkSession. If None, creates a new one.
profile: Databricks CLI profile name (optional).
Comment thread
awaismirza92 marked this conversation as resolved.

Returns:
List of successfully loaded table names.

Example:
>>> from integration.databricks.data import ingestion
>>> loaded = ingestion.load_from_gcs(
... bucket="https://static.getml.com/datasets/jaffle_shop/",
... destination_schema="RAW"
... )
>>> print(f"Loaded {len(loaded)} tables")
"""
table_names = list(tables) if tables else list(JAFFLE_SHOP_TABLES)

logger.info(f"Loading {len(table_names)} tables from {bucket}")
logger.info(f"Destination: {destination_catalog}.{destination_schema}")

if spark is None:
spark = _create_spark_session(profile=profile)

_ensure_schema_exists(spark, destination_catalog, destination_schema)

# Build table configs
table_configs = [
TableConfig(
source_url=f"{bucket.rstrip('/')}/{name}.parquet",
table_name=name,
catalog=destination_catalog,
schema=destination_schema,
)
for name in table_names
]

# Load each table
loaded_tables: list[str] = []
for config in table_configs:
try:
pdf = _download_parquet(config.source_url)
_write_to_delta(spark, pdf, config)
loaded_tables.append(config.table_name)
except requests.RequestException as e:
logger.error(f"Failed to download {config.source_url}: {e}")
except Exception as e:
logger.error(f"Failed to write {config.table_name}: {e}")

logger.info(f"Successfully loaded {len(loaded_tables)}/{len(table_names)} tables")
return loaded_tables


def list_tables(
schema: str = DEFAULT_SCHEMA,
catalog: str = DEFAULT_CATALOG,
spark: SparkSession | None = None,
profile: str | None = None,
) -> list[str]:
"""
List all tables in the specified schema.

Args:
schema: Schema name.
catalog: Catalog name.
spark: Optional existing SparkSession.
profile: Databricks CLI profile name (optional).

Returns:
List of table names.
"""
if spark is None:
spark = _create_spark_session(profile=profile)

rows = spark.sql(f"SHOW TABLES IN {catalog}.{schema}").collect() # pyright: ignore[reportUnknownMemberType]
return [row.tableName for row in rows] # pyright: ignore[reportAny]
Loading