Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/database/postgres_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,28 @@ async def delete_document(self, document_id: str, auth: AuthContext) -> bool:
if doc_model:
await session.delete(doc_model)
await session.commit()

# --------------------------------------------------------------------------------
# Maintain referential integrity: remove the deleted document ID from any folders
# that still list it in their document_ids JSONB array. This prevents the UI from
# requesting stale IDs after a delete.
# --------------------------------------------------------------------------------
try:
await session.execute(
text(
"""
UPDATE folders
SET document_ids = document_ids - :doc_id
WHERE document_ids ? :doc_id
"""
),
{"doc_id": document_id},
)
await session.commit()
except Exception as upd_err: # noqa: BLE001
# Non-fatal – log but keep the document deleted so user doesn't see it any more.
logger.error("Failed to remove deleted document %s from folders: %s", document_id, upd_err)

return True
return False

Expand Down
96 changes: 59 additions & 37 deletions core/vector_store/multi_vector_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import logging
import time
Expand All @@ -8,6 +9,7 @@
import psycopg
import torch
from pgvector.psycopg import Bit, register_vector
from psycopg_pool import ConnectionPool

from core.models.chunk import DocumentChunk

Expand Down Expand Up @@ -36,7 +38,10 @@ def __init__(
if uri.startswith("postgresql+asyncpg://"):
uri = uri.replace("postgresql+asyncpg://", "postgresql://")
self.uri = uri
self.conn = None
# Shared connection pool – re-uses sockets across jobs, avoids TLS
# handshakes and auth for every INSERT call. A small pool is enough
# because inserts are short-lived.
self.pool: ConnectionPool = ConnectionPool(conninfo=self.uri, min_size=1, max_size=10, timeout=60)
self.max_retries = max_retries
self.retry_delay = retry_delay
# Don't initialize here - initialization will be handled separately
Expand All @@ -57,20 +62,22 @@ def get_connection(self):
# Try to establish a new connection with retries
while attempt < self.max_retries:
try:
# Always create a fresh connection for critical operations
conn = psycopg.connect(self.uri, autocommit=True)
# Register vector extension on every new connection
register_vector(conn)
# Borrow a pooled connection (blocking wait). Autocommit stays
# disabled so we can batch-commit.
conn = self.pool.getconn()

try:
yield conn
return
finally:
# Always close connections after use
# Release connection back to the pool
try:
conn.close()
self.pool.putconn(conn)
except Exception:
pass
try:
conn.close()
except Exception:
pass
except psycopg.OperationalError as e:
last_error = e
attempt += 1
Expand Down Expand Up @@ -246,16 +253,8 @@ async def store_embeddings(self, chunks: List[DocumentChunk]) -> Tuple[bool, Lis
if not rows:
return True, []

with self.get_connection() as conn:
with conn.cursor() as cur:
cur.executemany(
"""
INSERT INTO multi_vector_embeddings
(document_id, chunk_number, content, chunk_metadata, embeddings)
VALUES (%s, %s, %s, %s, %s)
""",
rows,
)
# Off-load blocking DB I/O to a thread so we don't block the event loop
await asyncio.to_thread(self._bulk_insert_rows, rows)

stored_ids = [f"{r[0]}-{r[1]}" for r in rows]
logger.debug(f"{len(stored_ids)} multi-vector embeddings added in bulk")
Expand All @@ -272,30 +271,37 @@ async def query_similar(
# Convert query embeddings to binary format
binary_query_embeddings = self._binary_quantize(query_embedding)

# Build query
query = """
SELECT id, document_id, chunk_number, content, chunk_metadata,
max_sim(embeddings, %s) AS similarity
FROM multi_vector_embeddings
"""
def _bit_raw(b: Bit) -> str:
"""Return raw bit string without 'Bit(...)' wrapper"""
s = str(b)
# Expected formats: "Bit('1010')" or "Bit(1010)"
if s.startswith("Bit("):
s = s[4:-1] # strip wrapper
s = s.strip("'")
return s

bit_strings = [_bit_raw(b) for b in binary_query_embeddings]
array_literal = "ARRAY[" + ",".join(f"B'{s}'" for s in bit_strings) + "]::bit(128)[]"

params = [binary_query_embeddings]
# Start query with inlined array literal (internal usage only)
query = (
"SELECT id, document_id, chunk_number, content, chunk_metadata, "
f"max_sim(embeddings, {array_literal}) AS similarity "
"FROM multi_vector_embeddings"
)

params: List = []

# Add document filter if needed with proper parameterization
if doc_ids:
# Use placeholders for each document ID
placeholders = ", ".join(["%s"] * len(doc_ids))
query += f" WHERE document_id IN ({placeholders})"
# Add document IDs to params
params.extend(doc_ids)

# Add ordering and limit
query += " ORDER BY similarity DESC LIMIT %s"
params.append(k)

# Execute query with retry logic
with self.get_connection() as conn:
result = conn.execute(query, params).fetchall()
result = conn.execute(query, tuple(params)).fetchall()

# Convert to DocumentChunks
chunks = []
Expand Down Expand Up @@ -404,9 +410,25 @@ async def delete_chunks_by_document_id(self, document_id: str) -> bool:

def close(self):
"""Close the database connection."""
if self.conn:
try:
self.conn.close()
self.conn = None
except Exception as e:
logger.error(f"Error closing connection: {str(e)}")
# Close pool gracefully – this will close all underlying connections
try:
self.pool.close()
except Exception as e:
logger.error(f"Error closing connection pool: {e}")

# ----------------- internal helpers -----------------

def _bulk_insert_rows(self, rows: List[Tuple]):
"""Sync helper executed in a worker thread to avoid blocking."""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.executemany(
"""
INSERT INTO multi_vector_embeddings
(document_id, chunk_number, content, chunk_metadata, embeddings)
VALUES (%s, %s, %s, %s, %s)
""",
rows,
)
# Single commit for all rows – very fast
conn.commit()
4 changes: 2 additions & 2 deletions ee/ui-component/hooks/useMorphikChat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ export function useMorphikChat({

const [queryOptions, setQueryOptions] = useState<QueryOptions>({
filters: initialQueryOptions.filters ?? "{}",
k: initialQueryOptions.k ?? 10,
k: initialQueryOptions.k ?? 5,
min_score: initialQueryOptions.min_score ?? 0.7,
use_reranking: initialQueryOptions.use_reranking ?? false,
use_colpali: initialQueryOptions.use_colpali ?? true,
max_tokens: initialQueryOptions.max_tokens ?? 1024,
temperature: initialQueryOptions.temperature ?? 0.7,
temperature: initialQueryOptions.temperature ?? 0.5,
graph_name: initialQueryOptions.graph_name,
folder_name: initialQueryOptions.folder_name,
});
Expand Down
2 changes: 1 addition & 1 deletion ee/ui-component/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@morphik/ui",
"version": "0.2.22",
"version": "0.2.24",
"private": true,
"description": "Modern UI component for Morphik - A powerful document processing and querying system",
"author": "Morphik Team",
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies = [
"pre-commit>=4.2.0",
"psycopg>=3.2.9",
"psycopg-binary>=3.2.9",
"psycopg-pool>=3.2.6",
"psycopg2-binary>=2.9.9",
"pydantic>=2.11.4",
"pydantic-settings>=2.9.1",
Expand Down
24 changes: 14 additions & 10 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.