Skip to content

Commit 7ce409c

Browse files
authored
Merge pull request #1502 from MODSetter/fix/db-startup-index-lock-hang
hotpatch: Fix/db startup index lock hang
2 parents 284df84 + b9702b3 commit 7ce409c

4 files changed

Lines changed: 186 additions & 43 deletions

File tree

surfsense_backend/.env.example

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense
22

3+
# --- Database startup / safety knobs (optional) ---
4+
# Run extension/table/index DDL on app startup. Set FALSE when schema is owned
5+
# exclusively by Alembic migrations.
6+
# DB_BOOTSTRAP_ON_STARTUP=TRUE
7+
# lock_timeout (ms) for boot-time DDL so a contended CREATE INDEX/TABLE fails
8+
# fast instead of hanging the FastAPI lifespan behind another transaction.
9+
# DB_DDL_LOCK_TIMEOUT_MS=5000
10+
# idle_in_transaction_session_timeout (ms) so an abandoned "idle in transaction"
11+
# session can't wedge the DB indefinitely. 0 disables. (asyncpg only)
12+
# DB_IDLE_IN_TX_TIMEOUT_MS=900000
13+
# Same, for the Celery worker engine (long ingestion/podcast/video tasks). If a
14+
# task hasn't touched the DB in this window it's treated as orphaned and dropped.
15+
# 0 disables. (asyncpg only)
16+
# DB_CELERY_IDLE_IN_TX_TIMEOUT_MS=3600000
17+
318
# Deployment environment: dev or production
419
SURFSENSE_ENV=dev
520

surfsense_backend/app/config/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,28 @@ def is_cloud(cls) -> bool:
541541
# Database
542542
DATABASE_URL = os.getenv("DATABASE_URL")
543543

544+
# When TRUE (default) the app ensures extensions/tables/indexes exist on
545+
# startup. Set FALSE in environments where schema is owned exclusively by
546+
# Alembic migrations to skip all boot-time DDL.
547+
DB_BOOTSTRAP_ON_STARTUP = (
548+
os.getenv("DB_BOOTSTRAP_ON_STARTUP", "TRUE").upper() == "TRUE"
549+
)
550+
# Per-session lock_timeout (ms) applied to boot-time DDL so a contended
551+
# CREATE INDEX / CREATE TABLE fails fast instead of hanging the FastAPI
552+
# lifespan forever behind another transaction's lock.
553+
DB_DDL_LOCK_TIMEOUT_MS = int(os.getenv("DB_DDL_LOCK_TIMEOUT_MS", "5000"))
554+
# Global idle_in_transaction_session_timeout (ms) applied to every pooled
555+
# connection so an abandoned "idle in transaction" session can't wedge the
556+
# database indefinitely. 0 disables. Only applied to asyncpg connections.
557+
DB_IDLE_IN_TX_TIMEOUT_MS = int(os.getenv("DB_IDLE_IN_TX_TIMEOUT_MS", "900000"))
558+
# Same protection for the separate Celery worker engine, where long-running
559+
# ingestion/podcast/video tasks live. Kept higher than the web default so a
560+
# legitimate per-document embed window is never reaped: if a task hasn't
561+
# touched the DB in 60 min it's treated as orphaned and dropped. 0 disables.
562+
DB_CELERY_IDLE_IN_TX_TIMEOUT_MS = int(
563+
os.getenv("DB_CELERY_IDLE_IN_TX_TIMEOUT_MS", "3600000")
564+
)
565+
544566
# Celery / Redis
545567
# Redis (single endpoint for Celery broker, result backend, and app cache).
546568
# Legacy CELERY_BROKER_URL / CELERY_RESULT_BACKEND / REDIS_APP_URL still

surfsense_backend/app/db.py

Lines changed: 132 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import uuid
23
from collections.abc import AsyncGenerator
34
from contextlib import asynccontextmanager
@@ -34,6 +35,8 @@
3435
if config.AUTH_TYPE == "GOOGLE":
3536
from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID
3637

38+
logger = logging.getLogger(__name__)
39+
3740
DATABASE_URL = config.DATABASE_URL
3841

3942

@@ -2871,13 +2874,36 @@ def is_valid(self) -> bool:
28712874
PodcastStatus,
28722875
)
28732876

2877+
2878+
def _build_connect_args() -> dict:
2879+
"""Build driver connect_args, including a protective idle-in-transaction
2880+
timeout for asyncpg connections.
2881+
2882+
A single abandoned ``idle in transaction`` session can hold table/row locks
2883+
indefinitely and wedge writes plus boot-time DDL (the classic "FastAPI
2884+
stuck at application startup" failure). Setting
2885+
``idle_in_transaction_session_timeout`` server-side makes Postgres reap such
2886+
sessions automatically. It never affects sessions that are actively running
2887+
statements — only ones that opened a transaction and went idle.
2888+
"""
2889+
connect_args: dict = {}
2890+
idle_ms = config.DB_IDLE_IN_TX_TIMEOUT_MS
2891+
# ``server_settings`` is asyncpg-specific; only apply it for that driver.
2892+
if idle_ms and idle_ms > 0 and DATABASE_URL and "asyncpg" in DATABASE_URL:
2893+
connect_args["server_settings"] = {
2894+
"idle_in_transaction_session_timeout": str(idle_ms)
2895+
}
2896+
return connect_args
2897+
2898+
28742899
engine = create_async_engine(
28752900
DATABASE_URL,
28762901
pool_size=30,
28772902
max_overflow=150,
28782903
pool_recycle=1800,
28792904
pool_pre_ping=True,
28802905
pool_timeout=30,
2906+
connect_args=_build_connect_args(),
28812907
)
28822908
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
28832909

@@ -2902,54 +2928,117 @@ async def shielded_async_session():
29022928
await session.close()
29032929

29042930

2905-
async def setup_indexes():
2906-
async with engine.begin() as conn:
2907-
# Create indexes
2908-
# Document embedding indexes
2909-
await conn.execute(
2910-
text(
2911-
"CREATE INDEX IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)"
2912-
)
2913-
)
2914-
await conn.execute(
2915-
text(
2916-
"CREATE INDEX IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))"
2917-
)
2918-
)
2919-
# Document Chuck Indexes
2920-
await conn.execute(
2921-
text(
2922-
"CREATE INDEX IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)"
2923-
)
2924-
)
2925-
await conn.execute(
2926-
text(
2927-
"CREATE INDEX IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))"
2928-
)
2929-
)
2930-
# pg_trgm indexes for efficient ILIKE '%term%' searches on titles
2931-
# Critical for document mention picker (@mentions) to scale
2932-
await conn.execute(
2933-
text(
2934-
"CREATE INDEX IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)"
2935-
)
2936-
)
2937-
# B-tree index on search_space_id for fast filtering
2938-
await conn.execute(
2939-
text(
2940-
"CREATE INDEX IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)"
2941-
)
2942-
)
2943-
# Covering index for "recent documents" query - enables index-only scan
2944-
await conn.execute(
2945-
text(
2946-
"CREATE INDEX IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)"
2947-
)
2931+
# (index_name, table, CREATE statement). Built with CONCURRENTLY so an index
2932+
# build only takes a non-blocking ShareUpdateExclusiveLock — ingestion
2933+
# INSERT/UPDATE on documents/chunks keep flowing while the index builds, and a
2934+
# slow build can never freeze the FastAPI lifespan or block writers.
2935+
_INDEX_DEFINITIONS: list[tuple[str, str, str]] = [
2936+
(
2937+
"document_vector_index",
2938+
"documents",
2939+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS document_vector_index ON documents USING hnsw (embedding public.vector_cosine_ops)",
2940+
),
2941+
(
2942+
"document_search_index",
2943+
"documents",
2944+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS document_search_index ON documents USING gin (to_tsvector('english', content))",
2945+
),
2946+
(
2947+
"chucks_vector_index",
2948+
"chunks",
2949+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_vector_index ON chunks USING hnsw (embedding public.vector_cosine_ops)",
2950+
),
2951+
(
2952+
"chucks_search_index",
2953+
"chunks",
2954+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS chucks_search_index ON chunks USING gin (to_tsvector('english', content))",
2955+
),
2956+
# pg_trgm index for efficient ILIKE '%term%' searches on titles — critical
2957+
# for the document mention picker (@mentions) to scale.
2958+
(
2959+
"idx_documents_title_trgm",
2960+
"documents",
2961+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_title_trgm ON documents USING gin (title gin_trgm_ops)",
2962+
),
2963+
(
2964+
"idx_documents_search_space_id",
2965+
"documents",
2966+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_id ON documents (search_space_id)",
2967+
),
2968+
# Covering index for "recent documents" query — enables index-only scan.
2969+
(
2970+
"idx_documents_search_space_updated",
2971+
"documents",
2972+
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_documents_search_space_updated ON documents (search_space_id, updated_at DESC NULLS LAST) INCLUDE (id, title, document_type)",
2973+
),
2974+
]
2975+
2976+
2977+
async def _drop_invalid_index(conn, name: str) -> None:
2978+
"""Drop a leftover *invalid* index so it can be rebuilt.
2979+
2980+
A ``CREATE INDEX CONCURRENTLY`` that is interrupted (timeout, crash,
2981+
cancellation) leaves behind an ``indisvalid = false`` index. Because the
2982+
name now exists, a later ``CREATE INDEX CONCURRENTLY IF NOT EXISTS`` would
2983+
skip it and the broken index would persist forever. Detect and drop it
2984+
first.
2985+
"""
2986+
result = await conn.execute(
2987+
text("SELECT indisvalid FROM pg_index WHERE indexrelid = to_regclass(:n)"),
2988+
{"n": name},
2989+
)
2990+
row = result.first()
2991+
if row is not None and row[0] is False:
2992+
logger.warning(
2993+
"[startup] dropping invalid leftover index %s before rebuild", name
29482994
)
2995+
await conn.execute(text(f'DROP INDEX CONCURRENTLY IF EXISTS "{name}"'))
2996+
2997+
2998+
async def setup_indexes() -> None:
2999+
"""Ensure search/vector indexes exist without ever blocking startup.
3000+
3001+
Each index is created with ``CONCURRENTLY`` (so it never takes a blocking
3002+
SHARE lock on documents/chunks) under a short per-session ``lock_timeout``
3003+
(so a contended boot fails fast instead of hanging the lifespan forever).
3004+
Failures are logged and swallowed per-index — a missing index just gets
3005+
retried on the next boot rather than crash-looping the API.
3006+
"""
3007+
lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS)
3008+
# AUTOCOMMIT is mandatory: CREATE INDEX CONCURRENTLY cannot run inside a
3009+
# transaction block.
3010+
async with engine.connect() as base_conn:
3011+
conn = await base_conn.execution_options(isolation_level="AUTOCOMMIT")
3012+
await conn.execute(text(f"SET lock_timeout = {lock_timeout_ms}"))
3013+
for name, table, ddl in _INDEX_DEFINITIONS:
3014+
try:
3015+
await _drop_invalid_index(conn, name)
3016+
await conn.execute(text(ddl))
3017+
except Exception as exc:
3018+
# Non-fatal by design: a missing index is retried next boot.
3019+
logger.warning(
3020+
"[startup] index %s on %s not ready (%s: %s); "
3021+
"will retry on next boot",
3022+
name,
3023+
table,
3024+
exc.__class__.__name__,
3025+
exc,
3026+
)
29493027

29503028

29513029
async def create_db_and_tables():
3030+
if not config.DB_BOOTSTRAP_ON_STARTUP:
3031+
logger.info(
3032+
"[startup] DB bootstrap skipped (DB_BOOTSTRAP_ON_STARTUP=FALSE); "
3033+
"schema/indexes are expected to be managed by migrations"
3034+
)
3035+
return
3036+
3037+
lock_timeout_ms = int(config.DB_DDL_LOCK_TIMEOUT_MS)
29523038
async with engine.begin() as conn:
3039+
# Fail fast instead of hanging forever if another session holds a
3040+
# conflicting lock on a table we need to touch.
3041+
await conn.execute(text(f"SET LOCAL lock_timeout = {lock_timeout_ms}"))
29533042
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
29543043
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm"))
29553044
await conn.run_sync(Base.metadata.create_all)

surfsense_backend/app/tasks/celery_tasks/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,27 @@ def get_celery_session_maker() -> async_sessionmaker:
3232
"""
3333
global _celery_engine, _celery_session_maker
3434
if _celery_session_maker is None:
35+
# Reap connections orphaned mid-transaction (e.g. a worker that hung or
36+
# crashed mid-index) so they can't hold locks on documents/chunks and
37+
# wedge writes — the failure mode that previously left an "idle in
38+
# transaction" session holding locks for 11+ hours. Kept generous so a
39+
# legitimate long per-document embed window is never killed.
40+
connect_args: dict = {}
41+
idle_ms = config.DB_CELERY_IDLE_IN_TX_TIMEOUT_MS
42+
if (
43+
idle_ms
44+
and idle_ms > 0
45+
and config.DATABASE_URL
46+
and "asyncpg" in config.DATABASE_URL
47+
):
48+
connect_args["server_settings"] = {
49+
"idle_in_transaction_session_timeout": str(idle_ms)
50+
}
3551
_celery_engine = create_async_engine(
3652
config.DATABASE_URL,
3753
poolclass=NullPool,
3854
echo=False,
55+
connect_args=connect_args,
3956
)
4057
with contextlib.suppress(Exception):
4158
from app.observability.bootstrap import instrument_sqlalchemy_engine

0 commit comments

Comments
 (0)