feat: refactor indexing pipelines for some connectors#996
Conversation
…document migration - Added `download_and_extract_content` function to extract content from Google Drive files as markdown. - Updated Google Drive indexer to utilize the new content extraction method. - Implemented document migration logic to update legacy Composio document types to their native Google types. - Introduced identifier hashing for stable document identification. - Improved file pre-filtering to handle unchanged and rename-only files efficiently.
- Introduced integration tests for Calendar, Drive, and Gmail indexers to ensure proper document creation and migration. - Added tests for batch indexing functionality to validate the processing of multiple documents. - Implemented tests for legacy document migration to verify updates to document types and hashes. - Enhanced test coverage for the IndexingPipelineService to ensure robust functionality across various document types.
- Modified the `_should_skip_file` function to prevent skipping of documents with a FAILED status, ensuring they are reprocessed even if their content remains unchanged. - Added a new integration test to verify that FAILED documents are not skipped during the indexing process.
- Introduced helper functions `_is_date_only` and `_build_time_body` to streamline the construction of event start and end times for all-day and timed events. - Refactored the `create_update_calendar_event_tool` to utilize the new helper functions, improving code readability and maintainability. - Updated the Google Calendar sync service to ensure proper handling of calendar IDs with a default fallback to "primary". - Modified the ApprovalCard component to simplify the construction of event update arguments, enhancing clarity and reducing redundancy.
- Added `index_batch_parallel` method to enable concurrent indexing of documents with bounded concurrency, improving performance and efficiency. - Refactored existing indexing logic to utilize `asyncio.to_thread` for non-blocking execution of embedding and chunking functions. - Introduced unit tests to validate the functionality of the new parallel indexing method, ensuring robustness and error handling during document processing.
…ctors - Refactored Google Calendar and Gmail indexers to utilize the new `index_batch_parallel` method for concurrent document indexing, enhancing performance. - Updated the indexing logic to replace serial processing with parallel execution, allowing for improved efficiency in handling multiple documents. - Adjusted logging and error handling to accommodate the new parallel processing approach, ensuring robust operation during indexing. - Enhanced unit tests to validate the functionality of the parallel indexing method and its integration with existing workflows.
- Added performance logging to the `index_batch_parallel` method, capturing metrics for document indexing duration and concurrency. - Introduced timing measurements for both the overall indexing process and the parallel document gathering phase, improving observability of the indexing workflow. - Updated logging statements to provide detailed insights into the number of documents processed, indexed, and failed during the indexing operation.
…e indexer - Added `_download_files_parallel` function to enable concurrent downloading of files from Google Drive, improving efficiency in document processing. - Introduced `_download_and_index` function to handle the parallel downloading and indexing phases, streamlining the overall workflow. - Updated `_index_full_scan` and `_index_with_delta_sync` methods to utilize the new parallel downloading functionality, enhancing performance. - Added unit tests to validate the new parallel downloading and indexing logic, ensuring robustness and error handling during document processing.
- Introduced an asyncio lock to the GoogleDriveClient to ensure thread-safe access to the service instance. - Refactored the get_service method to utilize the lock, preventing concurrent attempts to create the service and improving stability in multi-threaded environments.
- Introduced `index_google_drive_selected_files` function to enable indexing of multiple user-selected files in parallel, improving efficiency. - Refactored existing indexing logic to handle batch processing, including error handling for individual file failures. - Added unit tests for the new batch indexing functionality, ensuring robustness and proper error collection during the indexing process.
- Introduced `download_file_to_disk` method to stream files directly to disk in chunks, reducing memory usage during downloads. - Updated `download_and_extract_content` function to utilize the new streaming download method for binary files, enhancing efficiency in handling large files. - Improved error handling for download operations, providing clearer feedback on failures.
- Refactored Linear and Notion indexers to utilize the shared IndexingPipelineService for improved document deduplication, summarization, chunking, and embedding with bounded parallel indexing. - Updated the `_build_connector_doc` function in both indexers to create ConnectorDocument instances with enhanced metadata and fallback summaries. - Modified the `index_linear_issues` and `index_notion_pages` functions to return a tuple of (indexed_count, skipped_count, warning_or_error_message) for better error handling and reporting. - Added unit tests for both indexers to validate the new parallel processing logic and ensure correct document creation and indexing behavior.
- Added a reentrant lock to ensure thread-safe access to the tokenizer and embedding model, preventing runtime errors during concurrent operations. - Updated the `truncate_for_embedding` and `embed_text` functions to utilize the lock, ensuring safe execution in multi-threaded environments. - Enhanced the `embed_texts` function to maintain thread safety while processing multiple texts for embedding.
- Removed the `documentTypeCountsAtom` and its associated logic from the document query atoms. - Introduced `useZeroDocumentTypeCounts` hook to provide real-time document type counts, enhancing responsiveness as documents are indexed. - Updated components to utilize the new hook for fetching document type counts, ensuring instant updates in the UI.
- Refactored Confluence and Jira indexers to utilize the shared IndexingPipelineService for improved document processing. - Updated the `_build_connector_doc` function in both indexers to create ConnectorDocument instances with enhanced metadata and fallback summaries. - Modified the `index_confluence_pages` and `index_jira_issues` functions to return a tuple of (indexed_count, skipped_count, warning_or_error_message) for better error handling and reporting. - Added unit tests for both indexers to validate the new parallel processing logic and ensure correct document creation and indexing behavior.
|
@AnishSarkar22 is attempting to deploy a commit to the Rohan Verma's projects Team on Vercel. A member of the Team first needs to authorize it. |
There was a problem hiding this comment.
Review by RecurseML
🔍 Review performed on c7ace83..0bc1c76
✨ No bugs found, your code is sparkling clean
✅ Files analyzed, no issues (41)
• surfsense_backend/app/agents/new_chat/tools/google_calendar/update_event.py
• surfsense_backend/app/connectors/google_drive/__init__.py
• surfsense_backend/app/connectors/google_drive/client.py
• surfsense_backend/app/connectors/google_drive/content_extractor.py
• surfsense_backend/app/indexing_pipeline/document_hashing.py
• surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py
• surfsense_backend/app/routes/search_source_connectors_routes.py
• surfsense_backend/app/services/google_calendar/kb_sync_service.py
• surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py
• surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py
• surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py
• surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py
• surfsense_backend/app/tasks/connector_indexers/jira_indexer.py
• surfsense_backend/app/tasks/connector_indexers/linear_indexer.py
• surfsense_backend/app/tasks/connector_indexers/notion_indexer.py
• surfsense_backend/app/utils/document_converters.py
• surfsense_backend/tests/integration/indexing_pipeline/test_calendar_pipeline.py
• surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py
• surfsense_backend/tests/integration/indexing_pipeline/test_gmail_pipeline.py
• surfsense_backend/tests/integration/indexing_pipeline/test_index_batch.py
• surfsense_backend/tests/integration/indexing_pipeline/test_migrate_legacy_docs.py
• surfsense_backend/tests/unit/connector_indexers/conftest.py
• surfsense_backend/tests/unit/connector_indexers/test_confluence_parallel.py
• surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py
• surfsense_backend/tests/unit/connector_indexers/test_jira_parallel.py
• surfsense_backend/tests/unit/connector_indexers/test_linear_parallel.py
• surfsense_backend/tests/unit/connector_indexers/test_notion_parallel.py
• surfsense_backend/tests/unit/indexing_pipeline/test_document_hashing.py
• surfsense_backend/tests/unit/indexing_pipeline/test_index_batch.py
• surfsense_backend/tests/unit/indexing_pipeline/test_index_batch_parallel.py
• surfsense_backend/tests/unit/indexing_pipeline/test_migrate_legacy_docs.py
• surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx
• surfsense_web/atoms/documents/document-mutation.atoms.ts
• surfsense_web/atoms/documents/document-query.atoms.ts
• surfsense_web/components/assistant-ui/connector-popup.tsx
• surfsense_web/components/hitl-edit-panel/hitl-edit-panel.tsx
• surfsense_web/components/onboarding-tour.tsx
• surfsense_web/components/tool-ui/google-calendar/update-event.tsx
• surfsense_web/contracts/enums/toolIcons.tsx
• surfsense_web/hooks/use-zero-document-type-counts.ts
• surfsense_web/lib/query-client/cache-keys.ts
⏭️ Files skipped (1)
| Locations |
|---|
surfsense_backend/tests/unit/connector_indexers/__init__.py |
…t methods - Implemented per-thread HTTP transport for concurrent downloads to ensure thread safety. - Refactored `download_file` and `download_file_to_disk` methods to utilize blocking calls on separate threads, improving performance during file operations. - Added logging to track the start and end of download and export processes, providing better visibility into execution time. - Updated unit tests to verify parallel execution of download and export operations, ensuring efficiency in handling multiple requests.
…fe operations - Added logging to track the start and end of file download and export processes, improving visibility into execution time. - Implemented per-thread HTTP transport for concurrent downloads and exports, ensuring thread safety. - Refactored download and export methods to utilize resolved credentials, enhancing functionality. - Updated unit tests to validate the new threading and logging features, ensuring robust parallel execution.
…indexer - Modified the `_should_skip_file` function to skip previously failed documents during processing, improving error handling. - Updated the corresponding test to reflect the new behavior, ensuring that failed documents are correctly identified and skipped during automatic sync.
…ction - Introduced a new utility for parsing .xlsx files into markdown format, enhancing the ability to process Excel documents natively. - Updated the Google Drive content extractor to utilize the new Excel parsing functionality, allowing for better handling of spreadsheet files. - Enhanced file type detection and export logic to support various document formats, improving overall content extraction accuracy. - Added unit tests to ensure the correctness of the new Excel parsing feature and its integration with existing content extraction workflows.
…clePlus in document components
… and UI components - Implemented a new export endpoint in the backend to support exporting documents in various formats (PDF, DOCX, HTML, LaTeX, EPUB, ODT, plain text). - Enhanced DocumentNode and FolderTreeView components to include export options in context and dropdown menus. - Created shared ExportMenuItems component for consistent export options across the application. - Integrated loading indicators for export actions to improve user experience.
Description
handleStartIndexingandhandleSkipIndexing..docxand.xlsxfile types by detecting them and forwarding to the ETL service.Motivation and Context
FIX #
Screenshots
API Changes
Change Type
Testing Performed
Checklist
High-level PR Summary
This PR refactors the connector indexing pipeline to use a unified parallel indexing architecture across Google Drive, Confluence, Jira, Linear, Notion, and Gmail connectors. The core change introduces
IndexingPipelineServicewith parallel document processing (index_batch_parallel) that uses bounded concurrency and isolated database sessions for each document. Drive indexer now streams large files directly to disk and parallelizes download/ETL/indexing phases. Legacy Composio connector documents are automatically migrated to native types usingNATIVE_TO_LEGACY_DOCTYPEmappings. The frontend switches from REST API polling to Zero real-time sync for document type counts, eliminating unnecessary cache invalidations. Embedding operations are protected with a reentrant lock to prevent tokenizer thread-safety issues. Google Calendar event updates now properly handle all-day events via_build_time_bodyhelper.⏱️ Estimated Review Time: 3+ hours
💡 Review Order Suggestion
surfsense_backend/app/indexing_pipeline/document_hashing.pysurfsense_backend/app/db/__init__.pysurfsense_backend/app/indexing_pipeline/connector_document.pysurfsense_backend/app/indexing_pipeline/indexing_pipeline_service.pysurfsense_backend/app/connectors/google_drive/content_extractor.pysurfsense_backend/app/connectors/google_drive/client.pysurfsense_backend/app/connectors/google_drive/__init__.pysurfsense_backend/app/tasks/connector_indexers/google_drive_indexer.pysurfsense_backend/app/tasks/connector_indexers/confluence_indexer.pysurfsense_backend/app/tasks/connector_indexers/jira_indexer.pysurfsense_backend/app/tasks/connector_indexers/linear_indexer.pysurfsense_backend/app/tasks/connector_indexers/notion_indexer.pysurfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.pysurfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.pysurfsense_backend/app/routes/search_source_connectors_routes.pysurfsense_backend/app/utils/document_converters.pysurfsense_backend/app/agents/new_chat/tools/google_calendar/update_event.pysurfsense_backend/app/services/google_calendar/kb_sync_service.pysurfsense_backend/tests/integration/indexing_pipeline/test_calendar_pipeline.pysurfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.pysurfsense_backend/tests/integration/indexing_pipeline/test_gmail_pipeline.pysurfsense_backend/tests/integration/indexing_pipeline/test_index_batch.pysurfsense_backend/tests/unit/indexing_pipeline/test_document_hashing.pysurfsense_backend/tests/unit/indexing_pipeline/test_index_batch.pysurfsense_backend/tests/unit/indexing_pipeline/test_index_batch_parallel.pysurfsense_backend/tests/unit/indexing_pipeline/test_migrate_legacy_docs.pysurfsense_backend/tests/unit/connector_indexers/test_confluence_parallel.pysurfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.pysurfsense_backend/tests/unit/connector_indexers/test_jira_parallel.pysurfsense_backend/tests/unit/connector_indexers/test_linear_parallel.pysurfsense_backend/tests/unit/connector_indexers/test_notion_parallel.pysurfsense_web/hooks/use-zero-document-type-counts.tssurfsense_web/components/onboarding-tour.tsxsurfsense_web/components/assistant-ui/connector-popup.tsxsurfsense_web/atoms/documents/document-mutation.atoms.tssurfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsxsurfsense_web/components/tool-ui/google-calendar/update-event.tsxsurfsense_web/components/hitl-edit-panel/hitl-edit-panel.tsxsurfsense_web/contracts/enums/toolIcons.tsxsurfsense_web/contracts/enums/toolIcons.tsxsurfsense_web/components/hitl-edit-panel/hitl-edit-panel.tsxsurfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsxsurfsense_web/components/tool-ui/google-calendar/update-event.tsx