Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added backend/.coverage
Binary file not shown.
13 changes: 0 additions & 13 deletions backend/app/adapters/outbound/db/celery_worker_db.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
"""Sync DB access for Celery workers (psycopg2).

The FastAPI app uses ``asyncpg`` + ``AsyncSession`` (see ``session.py``). Celery
prefork workers must **not** reuse that async engine: asyncpg connections are
bound to a specific asyncio loop, and ``asyncio.run()`` per task creates a new
loop → "Future attached to a different loop" / "Event loop is closed".

Workers use a separate **synchronous** engine built from the same ``DATABASE_URL``
by swapping ``+asyncpg`` for ``+psycopg2`` (``psycopg2-binary`` is already a
dependency).
"""

from __future__ import annotations

from sqlalchemy import create_engine
Expand Down Expand Up @@ -43,5 +31,4 @@ def _get_sessionmaker() -> sessionmaker[Session]:


def worker_session() -> Session:
"""Return a new sync session (caller must ``commit`` / ``rollback`` / ``close``)."""
return _get_sessionmaker()()
1 change: 0 additions & 1 deletion backend/app/adapters/outbound/db/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@


def _sync_url(url: str) -> str:
# Alembic uses a sync engine.
return url.replace("+asyncpg", "+psycopg2")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "10df2a717cda"
down_revision = "20260502_0003"
branch_labels = None
Expand Down Expand Up @@ -56,11 +55,9 @@ def upgrade() -> None:
unique=False,
)
op.drop_constraint(op.f("refresh_tokens_jti_key"), "refresh_tokens", type_="unique")
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_unique_constraint(
op.f("refresh_tokens_jti_key"),
"refresh_tokens",
Expand Down Expand Up @@ -113,4 +110,3 @@ def downgrade() -> None:
["email"],
unique=False,
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

def _document_orm_to_entity(row: DocumentORM) -> Document:
return Document(
id=row.id,
org_id=row.org_id,
id=UUID(row.id) if isinstance(row.id, str) else row.id,
org_id=UUID(row.org_id) if isinstance(row.org_id, str) else row.org_id,
title=row.title,
source_type=row.source_type,
storage_url=row.storage_url,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@

def _to_domain(model: OrganizationMembershipORM) -> OrganizationMembership:
return OrganizationMembership(
id=model.id,
org_id=model.org_id,
user_id=model.user_id,
id=UUID(model.id) if isinstance(model.id, str) else model.id,
org_id=UUID(model.org_id) if isinstance(model.org_id, str) else model.org_id,
user_id=(
UUID(model.user_id) if isinstance(model.user_id, str) else model.user_id
),
role=UserRole(model.role),
status=MembershipStatus(model.status),
joined_at=model.joined_at,
invited_by_user_id=model.invited_by_user_id,
invited_by_user_id=(
UUID(model.invited_by_user_id)
if isinstance(model.invited_by_user_id, str) and model.invited_by_user_id
else None
),
created_at=model.created_at,
updated_at=model.updated_at,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

def _to_domain(model: OrganizationORM) -> Organization:
return Organization(
id=model.id,
id=UUID(model.id) if isinstance(model.id, str) else model.id,
name=model.name,
slug=model.slug,
api_key_hash=model.api_key_hash,
Expand Down
15 changes: 10 additions & 5 deletions backend/app/adapters/outbound/db/repositories/uow_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,24 @@
organization_repository_impl,
user_repository_impl,
)
from app.domain.ports.outbound.chat_message import ChatMessageRepository
from app.domain.ports.outbound.chat_session import ChatSessionRepository
from app.domain.ports.outbound.membership_repository import MembershipRepository
from app.domain.ports.outbound.organization_repository import OrganizationRepository
from app.domain.ports.outbound.unit_of_work import UnitOfWork
from app.domain.ports.outbound.user_repository import UserRepository


class SQLAlchemyUnitOfWork(UnitOfWork):
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:
self._session_factory = session_factory
self._session: AsyncSession | None = None
self._committed = False
self.users = None
self.organizations = None
self.memberships = None
self.sessions = None
self.messages = None
self.users: UserRepository | None = None
self.organizations: OrganizationRepository | None = None
self.memberships: MembershipRepository | None = None
self.sessions: ChatSessionRepository | None = None
self.messages: ChatMessageRepository | None = None

async def __aenter__(self) -> "SQLAlchemyUnitOfWork":
self._session = self._session_factory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

def _to_domain(model: UserORM) -> User:
return User(
id=model.id,
id=UUID(model.id) if isinstance(model.id, str) else model.id,
email=model.email,
full_name=model.full_name,
password_hash=model.password_hash,
Expand Down
6 changes: 0 additions & 6 deletions backend/app/adapters/outbound/email/email_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


class NullEmailSender(EmailSender):
"""No-op when SMTP is disabled or not configured."""

def send_invitation_email(
self,
Expand All @@ -31,11 +30,6 @@ def send_invitation_email(


class SmtpEmailSender(EmailSender):
"""
Renders ``invitation_email.mjml`` with Jinja, compiles MJML to HTML via the
``mjml`` CLI, and sends mail over SMTP (e.g. Gmail :587 STARTTLS or Brevo SMTP).
"""

def __init__(
self,
*,
Expand Down
12 changes: 8 additions & 4 deletions backend/app/adapters/outbound/llm/openai_provider.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

from typing import AsyncGenerator
from typing import AsyncGenerator, cast

from openai import AsyncOpenAI
from openai import AsyncOpenAI, AsyncStream
from openai.types.chat import ChatCompletionChunk

from app.domain.ports.outbound.llm_port import LLMPort

Expand All @@ -20,11 +21,14 @@ async def generate_response_stream(
) -> AsyncGenerator[str, None]:
response = await self._client.chat.completions.create(
model=self._model,
messages=messages,
messages=messages, # type: ignore[arg-type]
temperature=temperature,
stream=True,
)
async for chunk in response:
stream: AsyncStream[ChatCompletionChunk] = cast(
AsyncStream[ChatCompletionChunk], response
)
async for chunk in stream:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
yield delta.content
8 changes: 0 additions & 8 deletions backend/app/adapters/outbound/loaders/docx_loader.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
"""DOCX (Office Open XML) document loader.

Uses ``python-docx`` to walk the document body in order, including paragraphs
and table cells, so the extracted text preserves the reading order a human
sees in Word.
"""

from __future__ import annotations

import io
Expand Down Expand Up @@ -37,7 +30,6 @@ def load_text(self, *, data: bytes, source_type: str) -> str:
raise ValueError("Invalid or corrupted DOCX file") from exc

parts: list[str] = []
# Walk the body in document order so paragraphs and tables stay interleaved.
body = doc.element.body
for child in body.iterchildren():
tag = child.tag
Expand Down
7 changes: 0 additions & 7 deletions backend/app/adapters/outbound/loaders/markdown_loader.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
"""Backwards-compatible re-export.

Markdown loading is currently the same as plain text decoding; we keep a
separate import path so adding a real markdown stripper later doesn't break
callers.
"""

from __future__ import annotations

from app.adapters.outbound.loaders.text_loader import TextDocumentLoader
Expand Down
19 changes: 2 additions & 17 deletions backend/app/adapters/outbound/loaders/pdf_loader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
"""PDF document loader.

Uses ``pypdf`` (pure Python, MIT-licensed) to extract page text. Pages are
joined with a form-feed separator so downstream chunkers can recover page
boundaries if needed.

Edge cases:
* Encrypted PDFs without a password yield empty text rather than raising.
* Non-PDF bytes raise ``ValueError`` with a clear message instead of
bubbling pypdf's internal errors to the worker.
"""

from __future__ import annotations

import io
Expand Down Expand Up @@ -39,25 +27,22 @@ def load_text(self, *, data: bytes, source_type: str) -> str:

if reader.is_encrypted:
try:
# pypdf returns 0 if decryption fails or 1/2 on success.
if reader.decrypt("") == 0:
logger.warning(
"Encrypted PDF could not be decrypted with empty password"
)
return ""
except Exception: # noqa: BLE001 — decryption can raise broadly
except Exception:
logger.exception("Failed to decrypt PDF")
return ""

pages: list[str] = []
for page_number, page in enumerate(reader.pages, start=1):
try:
text = page.extract_text() or ""
except Exception: # noqa: BLE001 — keep going past poison pages
except Exception:
logger.exception("Failed to extract text from PDF page %d", page_number)
text = ""
if text:
pages.append(text)
# Form-feed (\f) is the conventional page separator; chunker treats it
# as whitespace.
return "\n\f\n".join(pages)
6 changes: 0 additions & 6 deletions backend/app/adapters/outbound/loaders/text_loader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
"""Loader for plain-text and markdown documents.

Both formats decode as UTF-8 (with replacement on bad bytes) — markdown is
treated as text for retrieval; we don't try to render or strip syntax.
"""

from __future__ import annotations

from app.domain.ports.outbound.document_loader import DocumentLoader
Expand Down
4 changes: 0 additions & 4 deletions backend/app/adapters/outbound/parser/parser_factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""Factory for parsing different document types."""

from __future__ import annotations

import logging
Expand All @@ -26,8 +24,6 @@ def __init__(self):
".doc": self._docx_parser,
".txt": self._text_parser,
".text": self._text_parser,
# ".md": self._markdown_parser,
# ".markdown": self._markdown_parser,
}

def extract_text(self, file_stream: BinaryIO, file_path: str) -> str:
Expand Down
2 changes: 0 additions & 2 deletions backend/app/adapters/outbound/rerank/reranker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""Reranker adapter implementation."""

from app.domain.ports.outbound.reranker import Reranker
from app.domain.value_objects.document import Document

Expand Down
5 changes: 0 additions & 5 deletions backend/app/adapters/outbound/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,14 @@ def put_object(
) -> str:
target = self._resolve(key)
target.parent.mkdir(parents=True, exist_ok=True)
# Make every directory between base_dir and the file traversable so
# other-uid processes (Celery worker) can chdir/open through them.
for parent in _parents_below(target.parent, self._base_dir):
_ensure_mode(parent, DIR_MODE)
# Atomic write: write to temp in same dir, fsync, then rename.
fd, tmp_path = tempfile.mkstemp(prefix=".tmp-", dir=str(target.parent))
try:
with os.fdopen(fd, "wb") as fh:
fh.write(data)
fh.flush()
os.fsync(fh.fileno())
# mkstemp creates 0600; chmod *before* rename so the visible file
# is never momentarily unreadable to the worker.
os.chmod(tmp_path, FILE_MODE)
os.replace(tmp_path, target)
except Exception:
Expand Down
5 changes: 4 additions & 1 deletion backend/app/adapters/outbound/vector/pgvector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ async def query_by_similarity(
self,
*,
query_vector: list[float],
org_id: UUID | str,
org_id: str,
top_k: int = 5,
limit: int = 50,
alpha: float = 0.5,
user_reranker: bool = True,
namespace: str | None = None,
) -> list[dict[str, Any]]:
if not query_vector or not len(query_vector):
Expand Down
5 changes: 4 additions & 1 deletion backend/app/adapters/outbound/vector/pinecone_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ async def query_by_similarity(
self,
*,
query_vector: list[float],
org_id: UUID | str,
org_id: str,
top_k: int = 5,
limit: int = 50,
alpha: float = 0.5,
user_reranker: bool = True,
namespace: str | None = None,
) -> list[dict[str, Any]]:
if not query_vector or not len(query_vector):
Expand Down
4 changes: 0 additions & 4 deletions backend/app/adapters/outbound/vector/vector_retriever.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""Vector retriever adapter implementation."""

from __future__ import annotations

from typing import Any
Expand All @@ -10,8 +8,6 @@


class VectorRetriever:
"""Retriever using vector embeddings and ANN search."""

def __init__(
self,
*,
Expand Down
2 changes: 0 additions & 2 deletions backend/app/application/dto/document_schemas.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""HTTP-facing schemas for the documents API."""

from __future__ import annotations

from datetime import datetime
Expand Down
2 changes: 0 additions & 2 deletions backend/app/application/services/chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ async def ask_question(
citations_as_dicts = [c.__dict__ for c in citations]

async with self._uow_factory() as uow:
uow: UnitOfWork
history = await uow.messages.get_recent_by_session(session_id, limit=6)

user_msg = ChatMessage.create_user_message(
Expand All @@ -84,7 +83,6 @@ async def ask_question(
yield chunk

async with self._uow_factory() as uow:
uow: UnitOfWork
ai_msg = ChatMessage.create_assistant_message(
session_id=session_id,
org_id=org_id,
Expand Down
1 change: 0 additions & 1 deletion backend/app/application/services/iam_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ async def register_via_invitation(
metadata_json={},
)
)
# Flush user row so membership FK is satisfied.
await session.flush()
session.add(
OrganizationMembershipORM(
Expand Down
1 change: 0 additions & 1 deletion backend/app/application/use_cases/ingest_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

logger = logging.getLogger(__name__)


_FILENAME_SAFE = re.compile(r"[^A-Za-z0-9._-]+")


Expand Down
Loading
Loading