Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from core.routes.health import router as health_router
from core.routes.ingest import router as ingest_router
from core.routes.logs import router as logs_router # noqa: E402 – import after FastAPI app
from core.routes.migrate import router as migrate_router
from core.routes.models import router as models_router
from core.routes.usage import router as usage_router
from core.routes.v2 import router as v2_router
Expand Down Expand Up @@ -318,6 +319,9 @@ def _extract_provider(model_name: str) -> str:
# Register ingest router
app.include_router(ingest_router)

# Register migration router
app.include_router(migrate_router)

# Register documents router
app.include_router(documents_router)

Expand Down
10 changes: 9 additions & 1 deletion core/models/responses.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Literal, Optional

from pydantic import BaseModel, Field

from core.models.documents import Document
from core.models.folders import Folder


Expand Down Expand Up @@ -76,6 +77,13 @@ class DocumentDownloadUrlResponse(BaseModel):
expires_in: int


class MigrationIngestResponse(BaseModel):
"""Response for a migration document ingest request."""

status: Literal["created", "skipped"]
document: Document


class ChatTitleResponse(BaseModel):
"""Response for chat title update endpoint"""

Expand Down
76 changes: 76 additions & 0 deletions core/routes/migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logging
from typing import Literal, Optional

import arq
from fastapi import APIRouter, Depends, Form, HTTPException, UploadFile

from core.auth_utils import verify_token
from core.dependencies import get_redis_pool
from core.models.auth import AuthContext
from core.models.responses import MigrationIngestResponse
from core.routes.utils import parse_bool, parse_json_dict
from core.services_init import ingestion_service
from core.utils.typed_metadata import TypedMetadataError

router = APIRouter(prefix="/migrate", tags=["Migration"])
logger = logging.getLogger(__name__)


@router.post("/document", response_model=MigrationIngestResponse)
async def migrate_document(
file: UploadFile,
source_document_id: str = Form(...),
metadata: str = Form("{}"),
metadata_types: str = Form("{}"),
use_colpali: Optional[bool] = Form(None),
folder_name: Optional[str] = Form(None),
end_user_id: Optional[str] = Form(None),
on_conflict: Literal["skip", "fail"] = Form("skip"),
auth: AuthContext = Depends(verify_token),
redis: arq.ArqRedis = Depends(get_redis_pool),
) -> MigrationIngestResponse:
"""Ingest a migrated source document while preserving its document ID."""
source_document_id = (source_document_id or "").strip()
if not source_document_id:
raise HTTPException(status_code=400, detail="source_document_id is required")

try:
existing_doc = await ingestion_service.db.get_document(source_document_id, auth)
if existing_doc is not None:
if on_conflict == "skip":
return MigrationIngestResponse(status="skipped", document=existing_doc)
raise HTTPException(
status_code=409,
detail=f"Document {source_document_id} already exists in the target app",
)

metadata_dict = parse_json_dict(metadata, "metadata", default={})
metadata_types_dict = parse_json_dict(metadata_types, "metadata_types", default={})
file_content = await file.read()
filename = file.filename or "uploaded_file"

document = await ingestion_service.ingest_file_content(
file_content_bytes=file_content,
filename=filename,
content_type=file.content_type,
metadata=metadata_dict,
auth=auth,
redis=redis,
metadata_types=metadata_types_dict,
folder_name=folder_name,
end_user_id=end_user_id,
use_colpali=parse_bool(use_colpali),
external_id=source_document_id,
)
return MigrationIngestResponse(status="created", document=document)
except HTTPException:
raise
except PermissionError as exc:
raise HTTPException(status_code=403, detail=str(exc))
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
except TypedMetadataError as exc:
raise HTTPException(status_code=400, detail=str(exc))
except Exception as exc: # noqa: BLE001
logger.error("Error migrating document %s: %s", source_document_id, exc)
raise HTTPException(status_code=500, detail=f"Error migrating document: {str(exc)}")
11 changes: 10 additions & 1 deletion core/services/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ async def ingest_file_content(
folder_name: Optional[Union[str, List[str]]] = None,
end_user_id: Optional[str] = None,
use_colpali: Optional[bool] = False,
external_id: Optional[str] = None,
) -> Document:
"""
Ingests file content from bytes. Saves to storage, creates document record,
Expand All @@ -583,6 +584,7 @@ async def ingest_file_content(
)

doc = Document(
external_id=external_id or str(uuid.uuid4()),
filename=filename,
content_type=resolved_content_type,
metadata=metadata or {},
Expand All @@ -605,7 +607,14 @@ async def ingest_file_content(
doc.metadata_types = metadata_bundle.types

# 1. Create initial document record in DB
await self.db.store_document(doc, auth, metadata_bundle=metadata_bundle)
stored = await self.db.store_document(doc, auth, metadata_bundle=metadata_bundle)
if not stored:
if external_id:
raise HTTPException(
status_code=409,
detail=f"Document {external_id} already exists or could not be created",
)
raise HTTPException(status_code=500, detail="Failed to create document metadata")
logger.info(f"Initial document record created for {filename} (doc_id: {doc.external_id})")

try:
Expand Down
48 changes: 48 additions & 0 deletions core/tests/unit/test_ingestion_service_metadata_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from types import ModuleType

import pytest
from fastapi import HTTPException

from core.models.auth import AuthContext
from core.models.documents import Document
Expand Down Expand Up @@ -50,6 +51,22 @@ async def update_document(self, document_id: str, updates, auth: AuthContext, me
return True


class FakeStoreFailureDatabase(FakeDatabase):
def __init__(self, doc: Document):
super().__init__(doc)
self.store_calls = []

async def store_document(self, document: Document, auth: AuthContext, metadata_bundle=None):
self.store_calls.append(
{
"document": document,
"auth": auth,
"metadata_bundle": metadata_bundle,
}
)
return False


def _auth() -> AuthContext:
return AuthContext(user_id="user-1", app_id="app-1")

Expand Down Expand Up @@ -83,6 +100,37 @@ def _service(doc: Document):
return IngestionService(db, None, None, None, None), db


@pytest.mark.asyncio
async def test_file_ingest_aborts_when_initial_document_store_fails():
doc = _document()
db = FakeStoreFailureDatabase(doc)
service = IngestionService(db, None, None, None, None)

async def noop_limit_check(auth, content_length, document_id):
return None

service._verify_ingest_and_storage_limits = noop_limit_check
service._resolve_content_type = lambda content, filename, content_type: "text/plain"

with pytest.raises(HTTPException) as exc_info:
await service.ingest_file_content(
file_content_bytes=b"hello",
filename="report.txt",
content_type="text/plain",
metadata={"custom": "value"},
auth=_auth(),
redis=None,
metadata_types={"custom": "string"},
use_colpali=False,
external_id="doc-1",
)

assert exc_info.value.status_code == 409
assert "doc-1" in exc_info.value.detail
assert len(db.store_calls) == 1
assert db.update_calls == []


@pytest.mark.asyncio
async def test_metadata_only_update_allows_unchanged_managed_metadata_fields():
doc = _document()
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ response = db.query(
)

print(response.completion)

# Migrate this app's documents into another Morphik deployment.
# Run this from a machine that can reach both source and target, such as
# inside a customer's VPN for on-prem targets.
result = db.migrate(target_uri="morphik://owner_id:token@onprem.example.com", target_is_local=True)
print(result.created_count, result.skipped_count, result.failed_count)
```

### Nested Folders & Folder Depth
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/morphik/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from .async_ import AsyncMorphik
from .models import Document, DocumentQueryResponse, Summary
from .models import Document, DocumentQueryResponse, MigrationDocumentResult, MigrationResult, Summary
from .sync import Morphik

__all__ = [
Expand All @@ -12,6 +12,8 @@
"Document",
"Summary",
"DocumentQueryResponse",
"MigrationDocumentResult",
"MigrationResult",
]

__version__ = "1.2.2"
55 changes: 54 additions & 1 deletion sdks/python/morphik/_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@

import json
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import quote

from pydantic import BaseModel

MAX_LIMIT = 500
MIN_LOG_HOURS = 0.1
MAX_LOG_HOURS = 168.0
MIGRATION_SOURCE_METADATA_KEY = "_morphik_migration"
MIGRATION_RESERVED_METADATA_FIELDS = {
"app_id",
"end_user_id",
"external_id",
"filename",
"folder_id",
"folder_name",
"folder_path",
"owner_id",
}


def merge_folders(
Expand Down Expand Up @@ -197,3 +208,45 @@ def normalize_additional_folders(
if additional_folders:
return list(additional_folders) + folder_list
return folder_list


def build_migration_metadata(
document: Any,
*,
include_source_metadata: bool = True,
) -> Tuple[Dict[str, Any], Dict[str, str]]:
"""Prepare document metadata for migration ingestion.

The target API owns fields such as external_id, folder_name, and app_id, so
those values must travel through dedicated migration parameters instead of
user metadata.
"""
metadata = dict(getattr(document, "metadata", None) or {})
metadata_types = dict(getattr(document, "metadata_types", None) or {})

for field in MIGRATION_RESERVED_METADATA_FIELDS:
metadata.pop(field, None)
metadata_types.pop(field, None)

if include_source_metadata:
system_metadata = getattr(document, "system_metadata", None) or {}
source_info = {
"source_document_id": getattr(document, "external_id", None),
"source_app_id": getattr(document, "app_id", None),
"source_filename": getattr(document, "filename", None),
"source_created_at": system_metadata.get("created_at") if isinstance(system_metadata, dict) else None,
"source_updated_at": system_metadata.get("updated_at") if isinstance(system_metadata, dict) else None,
}
source_info = {key: value for key, value in source_info.items() if value is not None}

existing_source_info = metadata.get(MIGRATION_SOURCE_METADATA_KEY)
if isinstance(existing_source_info, dict):
existing_source_info = dict(existing_source_info)
for key, value in source_info.items():
existing_source_info.setdefault(key, value)
metadata[MIGRATION_SOURCE_METADATA_KEY] = existing_source_info
else:
metadata[MIGRATION_SOURCE_METADATA_KEY] = source_info
metadata_types.setdefault(MIGRATION_SOURCE_METADATA_KEY, "object")

return metadata, metadata_types
Loading
Loading