Skip to content

Commit f02ff3d

Browse files
committed
Benchmark query and retrieval performance
1 parent 36b1f73 commit f02ff3d

2 files changed

Lines changed: 272 additions & 6 deletions

File tree

core/api.py

Lines changed: 120 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import json
33
import logging
4+
import time # Add time import for profiling
45
from datetime import UTC, datetime, timedelta
56
from typing import Any, Dict, List, Optional, Union
67

@@ -42,6 +43,56 @@
4243

4344
# Initialize FastAPI app
4445
logger = logging.getLogger(__name__)
46+
47+
48+
# Performance tracking class
49+
class PerformanceTracker:
50+
def __init__(self, operation_name: str):
51+
self.operation_name = operation_name
52+
self.start_time = time.time()
53+
self.phases = {}
54+
self.current_phase = None
55+
self.phase_start = None
56+
57+
def start_phase(self, phase_name: str):
58+
# End current phase if one is running
59+
if self.current_phase and self.phase_start:
60+
self.phases[self.current_phase] = time.time() - self.phase_start
61+
62+
# Start new phase
63+
self.current_phase = phase_name
64+
self.phase_start = time.time()
65+
66+
def end_phase(self):
67+
if self.current_phase and self.phase_start:
68+
self.phases[self.current_phase] = time.time() - self.phase_start
69+
self.current_phase = None
70+
self.phase_start = None
71+
72+
def add_suboperation(self, name: str, duration: float):
73+
"""Add a sub-operation timing"""
74+
self.phases[name] = duration
75+
76+
def log_summary(self, additional_info: str = ""):
77+
total_time = time.time() - self.start_time
78+
79+
# End current phase if still running
80+
if self.current_phase and self.phase_start:
81+
self.phases[self.current_phase] = time.time() - self.phase_start
82+
83+
logger.info(f"=== {self.operation_name} Performance Summary ===")
84+
logger.info(f"Total time: {total_time:.2f}s")
85+
86+
# Sort phases by duration (longest first)
87+
for phase, duration in sorted(self.phases.items(), key=lambda x: x[1], reverse=True):
88+
percentage = (duration / total_time) * 100 if total_time > 0 else 0
89+
logger.info(f" - {phase}: {duration:.2f}s ({percentage:.1f}%)")
90+
91+
if additional_info:
92+
logger.info(additional_info)
93+
logger.info("=" * (len(self.operation_name) + 31))
94+
95+
4596
# ---------------------------------------------------------------------------
4697
# Application instance & core initialisation (moved lifespan, rest unchanged)
4798
# ---------------------------------------------------------------------------
@@ -158,8 +209,13 @@ async def retrieve_chunks(request: RetrieveRequest, auth: AuthContext = Depends(
158209
Returns:
159210
List[ChunkResult]: List of relevant chunks
160211
"""
212+
# Initialize performance tracker
213+
perf = PerformanceTracker(f"Retrieve Chunks: '{request.query[:50]}...'")
214+
161215
try:
162-
return await document_service.retrieve_chunks(
216+
# Main retrieval operation
217+
perf.start_phase("document_service_retrieve_chunks")
218+
results = await document_service.retrieve_chunks(
163219
request.query,
164220
auth,
165221
request.filters,
@@ -169,7 +225,13 @@ async def retrieve_chunks(request: RetrieveRequest, auth: AuthContext = Depends(
169225
request.use_colpali,
170226
request.folder_name,
171227
request.end_user_id,
228+
perf, # Pass performance tracker
172229
)
230+
231+
# Log consolidated performance summary
232+
perf.log_summary(f"Retrieved {len(results)} chunks")
233+
234+
return results
173235
except PermissionError as e:
174236
raise HTTPException(status_code=403, detail=str(e))
175237

@@ -195,8 +257,13 @@ async def retrieve_documents(request: RetrieveRequest, auth: AuthContext = Depen
195257
Returns:
196258
List[DocumentResult]: List of relevant documents
197259
"""
260+
# Initialize performance tracker
261+
perf = PerformanceTracker(f"Retrieve Docs: '{request.query[:50]}...'")
262+
198263
try:
199-
return await document_service.retrieve_docs(
264+
# Main retrieval operation
265+
perf.start_phase("document_service_retrieve_docs")
266+
results = await document_service.retrieve_docs(
200267
request.query,
201268
auth,
202269
request.filters,
@@ -207,6 +274,11 @@ async def retrieve_documents(request: RetrieveRequest, auth: AuthContext = Depen
207274
request.folder_name,
208275
request.end_user_id,
209276
)
277+
278+
# Log consolidated performance summary
279+
perf.log_summary(f"Retrieved {len(results)} documents")
280+
281+
return results
210282
except PermissionError as e:
211283
raise HTTPException(status_code=403, detail=str(e))
212284

@@ -227,16 +299,22 @@ async def batch_get_documents(request: Dict[str, Any], auth: AuthContext = Depen
227299
Returns:
228300
List[Document]: List of documents matching the IDs
229301
"""
302+
# Initialize performance tracker
303+
perf = PerformanceTracker("Batch Get Documents")
304+
230305
try:
231306
# Extract document_ids from request
307+
perf.start_phase("request_extraction")
232308
document_ids = request.get("document_ids", [])
233309
folder_name = request.get("folder_name")
234310
end_user_id = request.get("end_user_id")
235311

236312
if not document_ids:
313+
perf.log_summary("No document IDs provided")
237314
return []
238315

239316
# Create system filters for folder and user scoping
317+
perf.start_phase("filter_creation")
240318
system_filters = {}
241319
if folder_name is not None:
242320
normalized_folder_name = normalize_folder_name(folder_name)
@@ -246,7 +324,14 @@ async def batch_get_documents(request: Dict[str, Any], auth: AuthContext = Depen
246324
if auth.app_id:
247325
system_filters["app_id"] = auth.app_id
248326

249-
return await document_service.batch_retrieve_documents(document_ids, auth, folder_name, end_user_id)
327+
# Main batch retrieval operation
328+
perf.start_phase("batch_retrieve_documents")
329+
results = await document_service.batch_retrieve_documents(document_ids, auth, folder_name, end_user_id)
330+
331+
# Log consolidated performance summary
332+
perf.log_summary(f"Retrieved {len(results)}/{len(document_ids)} documents")
333+
334+
return results
250335
except PermissionError as e:
251336
raise HTTPException(status_code=403, detail=str(e))
252337

@@ -268,17 +353,23 @@ async def batch_get_chunks(request: Dict[str, Any], auth: AuthContext = Depends(
268353
Returns:
269354
List[ChunkResult]: List of chunk results
270355
"""
356+
# Initialize performance tracker
357+
perf = PerformanceTracker("Batch Get Chunks")
358+
271359
try:
272360
# Extract sources from request
361+
perf.start_phase("request_extraction")
273362
sources = request.get("sources", [])
274363
folder_name = request.get("folder_name")
275364
end_user_id = request.get("end_user_id")
276365
use_colpali = request.get("use_colpali")
277366

278367
if not sources:
368+
perf.log_summary("No sources provided")
279369
return []
280370

281371
# Convert sources to ChunkSource objects if needed
372+
perf.start_phase("source_conversion")
282373
chunk_sources = []
283374
for source in sources:
284375
if isinstance(source, dict):
@@ -287,6 +378,7 @@ async def batch_get_chunks(request: Dict[str, Any], auth: AuthContext = Depends(
287378
chunk_sources.append(source)
288379

289380
# Create system filters for folder and user scoping
381+
perf.start_phase("filter_creation")
290382
system_filters = {}
291383
if folder_name is not None:
292384
normalized_folder_name = normalize_folder_name(folder_name)
@@ -296,7 +388,16 @@ async def batch_get_chunks(request: Dict[str, Any], auth: AuthContext = Depends(
296388
if auth.app_id:
297389
system_filters["app_id"] = auth.app_id
298390

299-
return await document_service.batch_retrieve_chunks(chunk_sources, auth, folder_name, end_user_id, use_colpali)
391+
# Main batch retrieval operation
392+
perf.start_phase("batch_retrieve_chunks")
393+
results = await document_service.batch_retrieve_chunks(
394+
chunk_sources, auth, folder_name, end_user_id, use_colpali
395+
)
396+
397+
# Log consolidated performance summary
398+
perf.log_summary(f"Retrieved {len(results)}/{len(sources)} chunks")
399+
400+
return results
300401
except PermissionError as e:
301402
raise HTTPException(status_code=403, detail=str(e))
302403

@@ -337,11 +438,17 @@ async def query_completion(
337438
Returns:
338439
CompletionResponse: Generated text completion or structured output
339440
"""
441+
# Initialize performance tracker
442+
perf = PerformanceTracker(f"Query: '{request.query[:50]}...'")
443+
340444
try:
341445
# Validate prompt overrides before proceeding
446+
perf.start_phase("prompt_validation")
342447
if request.prompt_overrides:
343448
validate_prompt_overrides_with_http_exception(request.prompt_overrides, operation_type="query")
344449

450+
# Chat history retrieval
451+
perf.start_phase("chat_history_retrieval")
345452
history_key = None
346453
history: List[Dict[str, Any]] = []
347454
if request.chat_id:
@@ -366,10 +473,13 @@ async def query_completion(
366473
)
367474

368475
# Check query limits if in cloud mode
476+
perf.start_phase("limits_check")
369477
if settings.MODE == "cloud" and auth.user_id:
370478
# Check limits before proceeding
371479
await check_and_increment_limits(auth, "query", 1)
372480

481+
# Main query processing
482+
perf.start_phase("document_service_query")
373483
response = await document_service.query(
374484
request.query,
375485
auth,
@@ -388,8 +498,11 @@ async def query_completion(
388498
request.end_user_id,
389499
request.schema,
390500
history,
501+
perf, # Pass performance tracker
391502
)
392503

504+
# Chat history storage
505+
perf.start_phase("chat_history_storage")
393506
if history_key:
394507
history.append(
395508
{
@@ -406,6 +519,9 @@ async def query_completion(
406519
history,
407520
)
408521

522+
# Log consolidated performance summary
523+
perf.log_summary(f"Generated completion with {len(response.sources) if response.sources else 0} sources")
524+
409525
return response
410526
except ValueError as e:
411527
validate_prompt_overrides_with_http_exception(operation_type="query", error=e)

0 commit comments

Comments
 (0)