Skip to content

Commit 52bc0fb

Browse files
Adityav369claude
andcommitted
Read document status from a local snapshot instead of per-access API calls
Document.status / is_processing / is_ingested / is_failed / error now read the status already carried on the document (system_metadata) instead of calling get_document_status on every access, which caused an N+1 when iterating documents. status now also returns `as_of` and `source` ("local"/"not_loaded"); when status was not fetched (e.g. projected away), is_* return False and make no network call. - Add Document.refresh() for an explicit live re-fetch; docstrings point to refresh()/ wait_for_completion() for the current status. - Add a cheap `status` projection: list_documents(fields=[..., "status"]) reads system_metadata->>'status' via JSON-path (no document text), so is_* resolve locally. - Bump SDK to 1.2.6; add SDK + server projection tests incl. a call-counting regression guard. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent bbe803c commit 52bc0fb

8 files changed

Lines changed: 244 additions & 23 deletions

File tree

core/database/postgres_database.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@
6565
"app_id",
6666
"end_user_id",
6767
]
68+
# Lightweight scalar keys inside system_metadata that can be projected cheaply via a
69+
# JSON-path read (system_metadata->>'<key>'), without materializing the full
70+
# system_metadata blob (which holds the document text). Returned in a slim
71+
# system_metadata dict so the SDK can read e.g. doc status locally with no extra call.
72+
DOCUMENT_STATUS_PROJECTION_KEYS = {"status", "error", "created_at", "updated_at", "progress", "version"}
6873

6974

7075
class PostgresDatabase:
@@ -629,9 +634,10 @@ def _resolve_document_sort_clause(self, sort_by: Optional[str], sort_direction:
629634
def _resolve_document_projection_fields(fields: Optional[List[str]]) -> Optional[set]:
630635
"""Resolve requested API fields to the document table columns needed to serve them.
631636
632-
Note: ``summary_*`` and ``page_count`` are derived from ``system_metadata``, so
633-
projecting them reads that column (which also holds the full document text). Plain
634-
``metadata`` lives in its own column and stays lightweight.
637+
Lightweight status keys (``status``, ``error``, timestamps) are projected via a cheap
638+
JSON-path read of ``system_metadata`` rather than the full column, so they do not pull
639+
the document text. ``summary_*`` and ``page_count`` are derived from the full
640+
``system_metadata`` column. Plain ``metadata`` lives in its own column and stays light.
635641
"""
636642
if not fields:
637643
return None
@@ -645,8 +651,11 @@ def _resolve_document_projection_fields(fields: Optional[List[str]]) -> Optional
645651
for root in requested_roots:
646652
if root in DOCUMENT_PROJECTION_COLUMN_MAP:
647653
resolved_fields.add(root)
654+
elif root in DOCUMENT_STATUS_PROJECTION_KEYS:
655+
# Cheap JSON-path read of a single system_metadata key (no full blob).
656+
resolved_fields.add(f"sm:{root}")
648657
elif root in SUMMARY_METADATA_KEYS:
649-
# summary_* values are derived from system_metadata.
658+
# summary_* values are derived from the full system_metadata column.
650659
resolved_fields.add("system_metadata")
651660
elif root == "page_count":
652661
resolved_fields.add("system_metadata")
@@ -657,15 +666,35 @@ def _resolve_document_projection_fields(fields: Optional[List[str]]) -> Optional
657666
@staticmethod
658667
def _document_projection_columns(fields: set):
659668
"""Return a stable list of labeled SQLAlchemy columns for a document projection."""
660-
return [
669+
columns = [
661670
DOCUMENT_PROJECTION_COLUMN_MAP[field].label(field) for field in DOCUMENT_PROJECTION_ORDER if field in fields
662671
]
672+
# Cheap per-key JSON-path reads for lightweight system_metadata scalars.
673+
for field in sorted(f for f in fields if isinstance(f, str) and f.startswith("sm:")):
674+
key = field[len("sm:") :]
675+
columns.append(DocumentModel.system_metadata[key].astext.label(f"__sm_{key}"))
676+
return columns
663677

664678
@staticmethod
665679
def _document_projection_row_to_dict(row: Any, fields: set) -> Dict[str, Any]:
666680
"""Convert a projected document row (a SQLAlchemy mapping) to the public document dict shape."""
667681
document = dict(row)
668682

683+
# Reassemble cheaply-projected system_metadata scalars (labeled __sm_<key>) into a
684+
# slim system_metadata dict so the public shape matches a full document.
685+
status_keys = {f[len("sm:") :] for f in fields if isinstance(f, str) and f.startswith("sm:")}
686+
if status_keys:
687+
slim = {}
688+
for key in status_keys:
689+
label = f"__sm_{key}"
690+
if label in document:
691+
slim[key] = document.pop(label)
692+
existing = document.get("system_metadata")
693+
if isinstance(existing, dict):
694+
existing.update(slim)
695+
else:
696+
document["system_metadata"] = slim
697+
669698
for key in ("metadata", "metadata_types", "storage_info", "system_metadata", "additional_metadata"):
670699
if key in document and document[key] is None:
671700
document[key] = {}

core/routes/utils.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ def _add_derived_fields(document_dict: Dict[str, Any]) -> Dict[str, Any]:
3535
return enriched
3636

3737

38+
# Lightweight processing-state keys that live under system_metadata. When requested as a
39+
# top-level field (e.g. "status"), project the corresponding system_metadata.<key> so the
40+
# value survives in the slim system_metadata the SDK reads locally.
41+
_STATUS_ALIAS_KEYS = {"status", "error", "created_at", "updated_at", "progress", "version"}
42+
43+
3844
def project_document_fields(document_dict: Dict[str, Any], fields: Optional[List[str]]) -> Dict[str, Any]:
3945
"""
4046
Project document data to a subset of fields, always including the external_id for reference.
@@ -45,7 +51,11 @@ def project_document_fields(document_dict: Dict[str, Any], fields: Optional[List
4551
return document_dict
4652

4753
projected: Dict[str, Any] = {}
48-
normalized_fields: List[str] = [field.strip() for field in fields if field and field.strip()]
54+
normalized_fields: List[str] = [
55+
f"system_metadata.{field.strip()}" if field.strip() in _STATUS_ALIAS_KEYS else field.strip()
56+
for field in fields
57+
if field and field.strip()
58+
]
4959
include_external_id = "external_id" in normalized_fields
5060

5161
for field_path in normalized_fields:

core/tests/unit/test_document_projection.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,34 @@ def test_metadata_projection_does_not_read_content(self):
5353
assert "system_metadata" not in sql
5454

5555

56+
class TestStatusProjection:
57+
"""Lightweight `status` projection reads only a JSON-path, not the full blob."""
58+
59+
def test_status_resolves_to_cheap_json_path(self):
60+
resolved = PostgresDatabase._resolve_document_projection_fields(["status"])
61+
assert "sm:status" in resolved
62+
assert "external_id" in resolved
63+
assert "system_metadata" not in resolved # never the full column
64+
65+
def test_status_sql_uses_json_path(self):
66+
resolved = PostgresDatabase._resolve_document_projection_fields(["external_id", "status"])
67+
sql = str(select(*PostgresDatabase._document_projection_columns(resolved)))
68+
assert "system_metadata ->>" in sql
69+
assert "__sm_status" in sql
70+
71+
def test_row_reassembles_slim_system_metadata(self):
72+
resolved = PostgresDatabase._resolve_document_projection_fields(["status", "error"])
73+
row = {"external_id": "d1", "__sm_status": "completed", "__sm_error": None}
74+
out = PostgresDatabase._document_projection_row_to_dict(row, resolved)
75+
assert out["system_metadata"]["status"] == "completed"
76+
assert "__sm_status" not in out
77+
78+
def test_app_layer_projection_keeps_status(self):
79+
doc = {"external_id": "d1", "system_metadata": {"status": "completed"}}
80+
out = project_document_fields(doc, ["external_id", "status"])
81+
assert out["system_metadata"] == {"status": "completed"}
82+
83+
5684
class TestProjectionRowToDict:
5785
"""PostgresDatabase._document_projection_row_to_dict."""
5886

sdks/python/CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [1.2.6] - 2026-06-19
11+
12+
### Changed
13+
- `Document` status is now a local snapshot read instead of a per-access API call.
14+
`Document.status` / `is_processing` / `is_ingested` / `is_failed` / `error` read the status
15+
already carried on the document (`system_metadata`), eliminating an N+1 when iterating
16+
documents (previously each `is_*` access made its own request). `status` now also returns
17+
`as_of` (when the snapshot was pulled) and `source` (`"local"` / `"not_loaded"`). If status
18+
was not fetched (e.g. projected away), `is_*` return `False` and make **no** network call.
19+
Use `Document.refresh()` or `wait_for_completion()` for the current live status.
20+
21+
### Added
22+
- `Document.refresh()` — re-fetch a document from the server to get its current status.
23+
- `status` is now a cheap, projectable field: `list_documents(fields=[..., "status"])` returns
24+
the processing status (and `error`/timestamps) via a JSON-path read — without downloading the
25+
full document text — so `is_*` resolve locally with zero extra calls.
26+
1027
## [1.2.5] - 2026-06-19
1128

1229
### Fixed

sdks/python/morphik/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616
"MigrationResult",
1717
]
1818

19-
__version__ = "1.2.5"
19+
__version__ = "1.2.6"

sdks/python/morphik/models.py

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from datetime import date, datetime
1+
from datetime import date, datetime, timezone
22
from decimal import Decimal
33
from pathlib import Path
44
from typing import Any, BinaryIO, Dict, List, Literal, Optional, Union
55

6-
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
6+
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, field_validator, model_validator
77

88

99
def _reconstruct_metadata_types(metadata: Dict[str, Any], metadata_types: Dict[str, str]) -> Dict[str, Any]:
@@ -74,6 +74,8 @@ class Document(BaseModel):
7474

7575
# Client reference for update methods
7676
_client = None
77+
# When this document snapshot was pulled from the server (UTC), set at construction.
78+
_fetched_at: Optional[datetime] = PrivateAttr(default_factory=lambda: datetime.now(timezone.utc))
7779

7880
@model_validator(mode="after")
7981
def _reconstruct_types(self) -> "Document":
@@ -85,38 +87,96 @@ def _reconstruct_types(self) -> "Document":
8587

8688
@property
8789
def status(self) -> Dict[str, Any]:
88-
"""Get the latest processing status of the document from the API.
89-
90-
Returns:
91-
Dict[str, Any]: Status information including current status, potential errors, and other metadata
90+
"""Processing status of the document **as of when it was fetched** (a snapshot).
91+
92+
This reads the status already carried on the document (in ``system_metadata``) and
93+
does **not** make a network call. The returned dict includes:
94+
95+
- ``status``: ``processing`` / ``completed`` / ``failed`` (or ``unknown``)
96+
- ``error``: error message when ``status == "failed"``
97+
- ``created_at`` / ``updated_at``: server timestamps for the document
98+
- ``as_of``: ISO-8601 timestamp of when this snapshot was pulled from the server
99+
- ``source``: ``"local"`` (read from the document) or ``"not_loaded"``
100+
101+
If the document was fetched with a field projection that excluded the status
102+
(e.g. ``list_documents(fields=["metadata"])`` without ``"status"``), there is no
103+
local status and this returns ``status="unknown"`` / ``source="not_loaded"`` —
104+
**it does not make a network call.** To include status cheaply in a projection,
105+
add ``"status"`` to ``fields``; for the *current* live status, call :meth:`refresh`
106+
or :meth:`wait_for_completion` instead of re-reading this property in a loop.
92107
"""
93-
if self._client is None:
94-
raise ValueError(
95-
"Document instance not connected to a client. Use a document returned from a Morphik client method."
96-
)
97-
return self._client.get_document_status(self.external_id)
108+
sm = self.system_metadata or {}
109+
if "status" in sm:
110+
return {
111+
"document_id": self.external_id,
112+
"status": sm.get("status"),
113+
"error": sm.get("error"),
114+
"created_at": sm.get("created_at"),
115+
"updated_at": sm.get("updated_at"),
116+
"as_of": self._fetched_at.isoformat() if self._fetched_at else sm.get("updated_at"),
117+
"source": "local",
118+
}
119+
# Status was not fetched with this document (e.g. projected away). Do not silently
120+
# make a per-document API call — report it as not loaded.
121+
return {
122+
"document_id": self.external_id,
123+
"status": "unknown",
124+
"error": None,
125+
"as_of": self._fetched_at.isoformat() if self._fetched_at else None,
126+
"source": "not_loaded",
127+
}
98128

99129
@property
100130
def is_processing(self) -> bool:
101-
"""Check if the document is still being processed."""
131+
"""True if the document was still processing **as of when it was fetched** (snapshot).
132+
133+
See :attr:`status`. Returns ``False`` if status was not loaded (projected away). Use
134+
:meth:`refresh` / :meth:`wait_for_completion` for the current state.
135+
"""
102136
return self.status.get("status") == "processing"
103137

104138
@property
105139
def is_ingested(self) -> bool:
106-
"""Check if the document has completed processing."""
140+
"""True if the document had completed processing **as of when it was fetched** (snapshot).
141+
142+
See :attr:`status`. Returns ``False`` if status was not loaded (projected away). Use
143+
:meth:`refresh` / :meth:`wait_for_completion` for the current state.
144+
"""
107145
return self.status.get("status") == "completed"
108146

109147
@property
110148
def is_failed(self) -> bool:
111-
"""Check if document processing has failed."""
149+
"""True if document processing had failed **as of when it was fetched** (snapshot).
150+
151+
See :attr:`status`. Returns ``False`` if status was not loaded (projected away). Use
152+
:meth:`refresh` for the current state.
153+
"""
112154
return self.status.get("status") == "failed"
113155

114156
@property
115157
def error(self) -> Optional[str]:
116-
"""Get the error message if processing failed."""
158+
"""Error message if processing had failed (snapshot; see :attr:`status`)."""
117159
status_info = self.status
118160
return status_info.get("error") if status_info.get("status") == "failed" else None
119161

162+
def refresh(self) -> "Document":
163+
"""Re-fetch this document from the server and return the updated snapshot.
164+
165+
Use this (or :meth:`wait_for_completion`) when you need the *current* status rather
166+
than the snapshot carried on this object::
167+
168+
doc = doc.refresh()
169+
if doc.is_ingested:
170+
...
171+
172+
Requires a document returned from a Morphik client method.
173+
"""
174+
if self._client is None:
175+
raise ValueError(
176+
"Document instance not connected to a client. Use a document returned from a Morphik client method."
177+
)
178+
return self._client.get_document(self.external_id)
179+
120180
def wait_for_completion(self, timeout_seconds=300, check_interval_seconds=2, progress_callback=None):
121181
"""Wait for document processing to complete.
122182
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
"""Document status is a local snapshot read — no per-access network calls."""
2+
3+
import pytest
4+
from morphik.models import Document
5+
6+
7+
def test_status_reads_local_snapshot_without_a_call():
8+
doc = Document(
9+
external_id="d1",
10+
content_type="text/plain",
11+
system_metadata={"status": "failed", "error": "boom", "updated_at": "2026-06-01T00:00:00"},
12+
)
13+
snap = doc.status
14+
assert snap["status"] == "failed"
15+
assert snap["error"] == "boom"
16+
assert snap["source"] == "local"
17+
assert snap["as_of"] # stamped at construction
18+
assert doc.is_failed and not doc.is_processing and not doc.is_ingested
19+
assert doc.error == "boom"
20+
21+
22+
def test_status_not_loaded_makes_no_call():
23+
# Status projected away (no system_metadata) and no client attached: must not call out.
24+
doc = Document(external_id="d2", content_type="text/plain", metadata={"a": 1})
25+
snap = doc.status
26+
assert snap["status"] == "unknown"
27+
assert snap["source"] == "not_loaded"
28+
assert doc.is_failed is False
29+
assert doc.is_processing is False
30+
assert doc.is_ingested is False
31+
32+
33+
def test_projected_status_is_read_locally():
34+
# Shape the server returns for list_documents(fields=[..., "status"]).
35+
doc = Document(external_id="d3", content_type="text/plain", system_metadata={"status": "completed"})
36+
assert doc.is_ingested
37+
assert doc.status["source"] == "local"
38+
39+
40+
def test_refresh_requires_client():
41+
doc = Document(external_id="d4", content_type="text/plain")
42+
with pytest.raises(ValueError):
43+
doc.refresh()
44+
45+
46+
class _CountingClient:
47+
"""Records any status/document fetch so tests can assert zero calls."""
48+
49+
def __init__(self):
50+
self.calls = []
51+
52+
def get_document_status(self, *args, **kwargs):
53+
self.calls.append("get_document_status")
54+
return {"status": "processing"}
55+
56+
def get_document(self, *args, **kwargs):
57+
self.calls.append("get_document")
58+
return None
59+
60+
61+
def test_is_star_make_zero_client_calls_when_status_local():
62+
# Regression guard for the N+1: reading status/is_* on a document that already carries
63+
# its status must NOT make any client call, even with a client attached.
64+
doc = Document(external_id="d5", content_type="text/plain", system_metadata={"status": "completed"})
65+
client = _CountingClient()
66+
doc._client = client
67+
_ = (doc.status, doc.is_failed, doc.is_processing, doc.is_ingested, doc.error)
68+
assert client.calls == [], f"is_*/status must make zero client calls, made: {client.calls}"
69+
70+
71+
def test_is_star_make_zero_calls_when_not_loaded_even_with_client():
72+
doc = Document(external_id="d6", content_type="text/plain") # status not loaded
73+
client = _CountingClient()
74+
doc._client = client
75+
_ = (doc.is_failed, doc.is_processing, doc.is_ingested)
76+
assert client.calls == [], f"not-loaded status must make zero calls, made: {client.calls}"
77+
assert doc.status["source"] == "not_loaded"

sdks/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "morphik"
7-
version = "1.2.5"
7+
version = "1.2.6"
88
authors = [
99
{ name = "Morphik", email = "founders@morphik.ai" },
1010
]

0 commit comments

Comments
 (0)