Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f7b5247
feat: enhance Google connectors indexing with content extraction and …
AnishSarkar22 Mar 25, 2026
8c41fd9
feat: add integration tests for indexing pipeline components
AnishSarkar22 Mar 25, 2026
c3d5c86
fix: update file skipping logic in Google Drive indexer
AnishSarkar22 Mar 25, 2026
bbd5ee8
feat: enhance Google Calendar event update functionality
AnishSarkar22 Mar 25, 2026
e5cb6bf
feat: implement parallel document indexing in IndexingPipelineService
AnishSarkar22 Mar 26, 2026
4fd776e
feat: implement parallel indexing for Google Calendar and Gmail conne…
AnishSarkar22 Mar 26, 2026
bd6e335
feat: enhance performance logging in indexing pipeline
AnishSarkar22 Mar 26, 2026
c016962
feat: implement parallel file downloading and indexing in Google Driv…
AnishSarkar22 Mar 26, 2026
2f30e48
feat: implement async service locking in Google Drive client
AnishSarkar22 Mar 26, 2026
7c7f8b2
feat: implement batch indexing for selected Google Drive files
AnishSarkar22 Mar 26, 2026
da6bbcf
feat: add file streaming download functionality to Google Drive client
AnishSarkar22 Mar 27, 2026
db6dd05
feat: migrate Linear and Notion indexers to unified parallel pipeline
AnishSarkar22 Mar 27, 2026
683a4c1
feat: implement thread-safe embedding access in document converters
AnishSarkar22 Mar 27, 2026
ec79142
refactor: replace document type counts atom with real-time hook
AnishSarkar22 Mar 27, 2026
7a2467c
refactor: remove type counts invalidation from document mutation atoms
AnishSarkar22 Mar 27, 2026
22e36d0
refactor: update bulk delete bar positioning and styling in Documents…
AnishSarkar22 Mar 27, 2026
0bc1c76
feat: migrate Confluence and Jira indexers to unified parallel pipeline
AnishSarkar22 Mar 27, 2026
d2a4b23
feat: enhance Google Drive client with thread-safe download and expor…
AnishSarkar22 Mar 27, 2026
00934ff
feat: enhance Google Drive client with improved logging and thread-sa…
AnishSarkar22 Mar 27, 2026
3ce831d
feat: reset indexing configurations in connector dialog
AnishSarkar22 Mar 27, 2026
4e0749f
fix: update file skipping logic for failed documents in Google Drive …
AnishSarkar22 Mar 27, 2026
3da0ffd
feat: add native Excel parsing and improve Google Drive content extra…
AnishSarkar22 Mar 27, 2026
dff8a1d
feat: add descendant checking for folder filtering in Google Drive ch…
AnishSarkar22 Mar 27, 2026
489e486
fix: revert native excel parsing
AnishSarkar22 Mar 27, 2026
6d4eb32
fix: update export format for Google Docs to use correct MIME type
AnishSarkar22 Mar 27, 2026
17091ed
Merge remote-tracking branch 'upstream/dev' into refactor/indexing-pi…
AnishSarkar22 Mar 27, 2026
ddccba0
refactor: improve UI components for folder and document management
AnishSarkar22 Mar 27, 2026
13f4b17
feat: enhance context menu functionality in DocumentNode and FolderNo…
AnishSarkar22 Mar 27, 2026
0204ed5
refactor: replace Pencil icon with PenLine and update TreePine to Cir…
AnishSarkar22 Mar 27, 2026
9654979
feat: enhance DocumentNode component with loading and error indicators
AnishSarkar22 Mar 27, 2026
0aa9cd6
feat: implement dropdown menu state management in DocumentNode and si…
AnishSarkar22 Mar 27, 2026
b5ef7af
feat: add multi-format document export functionality to editor routes…
AnishSarkar22 Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@
logger = logging.getLogger(__name__)


def _is_date_only(value: str) -> bool:
"""Return True when *value* looks like a bare date (YYYY-MM-DD) with no time component."""
return len(value) <= 10 and "T" not in value


def _build_time_body(value: str, context: dict[str, Any] | Any) -> dict[str, str]:
"""Build a Google Calendar start/end body using ``date`` for all-day
events and ``dateTime`` for timed events."""
if _is_date_only(value):
return {"date": value}
tz = context.get("timezone", "UTC") if isinstance(context, dict) else "UTC"
return {"dateTime": value, "timeZone": tz}


def create_update_calendar_event_tool(
db_session: AsyncSession | None = None,
search_space_id: int | None = None,
Expand Down Expand Up @@ -255,25 +269,13 @@ async def update_calendar_event(
if final_new_summary is not None:
update_body["summary"] = final_new_summary
if final_new_start_datetime is not None:
tz = (
context.get("timezone", "UTC")
if isinstance(context, dict)
else "UTC"
update_body["start"] = _build_time_body(
final_new_start_datetime, context
)
update_body["start"] = {
"dateTime": final_new_start_datetime,
"timeZone": tz,
}
if final_new_end_datetime is not None:
tz = (
context.get("timezone", "UTC")
if isinstance(context, dict)
else "UTC"
update_body["end"] = _build_time_body(
final_new_end_datetime, context
)
update_body["end"] = {
"dateTime": final_new_end_datetime,
"timeZone": tz,
}
if final_new_description is not None:
update_body["description"] = final_new_description
if final_new_location is not None:
Expand Down
3 changes: 2 additions & 1 deletion surfsense_backend/app/connectors/google_drive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

from .change_tracker import categorize_change, fetch_all_changes, get_start_page_token
from .client import GoogleDriveClient
from .content_extractor import download_and_process_file
from .content_extractor import download_and_extract_content, download_and_process_file
from .credentials import get_valid_credentials, validate_credentials
from .folder_manager import get_file_by_id, get_files_in_folder, list_folder_contents

__all__ = [
"GoogleDriveClient",
"categorize_change",
"download_and_extract_content",
"download_and_process_file",
"fetch_all_changes",
"get_file_by_id",
Expand Down
53 changes: 42 additions & 11 deletions surfsense_backend/app/connectors/google_drive/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Google Drive API client."""

import asyncio
import io
from typing import Any

Expand Down Expand Up @@ -35,6 +36,7 @@ def __init__(
self.connector_id = connector_id
self._credentials = credentials
self.service = None
self._service_lock = asyncio.Lock()

async def get_service(self):
"""
Expand All @@ -49,17 +51,21 @@ async def get_service(self):
if self.service:
return self.service

try:
if self._credentials:
credentials = self._credentials
else:
credentials = await get_valid_credentials(
self.session, self.connector_id
)
self.service = build("drive", "v3", credentials=credentials)
return self.service
except Exception as e:
raise Exception(f"Failed to create Google Drive service: {e!s}") from e
async with self._service_lock:
if self.service:
return self.service

try:
if self._credentials:
credentials = self._credentials
else:
credentials = await get_valid_credentials(
self.session, self.connector_id
)
self.service = build("drive", "v3", credentials=credentials)
return self.service
except Exception as e:
raise Exception(f"Failed to create Google Drive service: {e!s}") from e

async def list_files(
self,
Expand Down Expand Up @@ -166,6 +172,31 @@ async def download_file(self, file_id: str) -> tuple[bytes | None, str | None]:
except Exception as e:
return None, f"Error downloading file: {e!s}"

async def download_file_to_disk(
self, file_id: str, dest_path: str, chunksize: int = 5 * 1024 * 1024,
) -> str | None:
"""Stream file directly to disk in chunks, avoiding full in-memory buffering.

Returns error message on failure, None on success.
"""
try:
service = await self.get_service()
request = service.files().get_media(fileId=file_id)
from googleapiclient.http import MediaIoBaseDownload

with open(dest_path, "wb") as fh:
downloader = MediaIoBaseDownload(fh, request, chunksize=chunksize)
done = False
while not done:
_, done = downloader.next_chunk()

return None

except HttpError as e:
return f"HTTP error downloading file: {e.resp.status}"
except Exception as e:
return f"Error downloading file: {e!s}"

async def export_google_file(
self, file_id: str, mime_type: str
) -> tuple[bytes | None, str | None]:
Expand Down
159 changes: 159 additions & 0 deletions surfsense_backend/app/connectors/google_drive/content_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,165 @@
logger = logging.getLogger(__name__)


async def download_and_extract_content(
client: GoogleDriveClient,
file: dict[str, Any],
) -> tuple[str | None, dict[str, Any], str | None]:
"""Download a Google Drive file and extract its content as markdown.

ETL only -- no DB writes, no indexing, no summarization.

Returns:
(markdown_content, drive_metadata, error_message)
On success error_message is None.
"""
file_id = file.get("id")
file_name = file.get("name", "Unknown")
mime_type = file.get("mimeType", "")

if should_skip_file(mime_type):
return None, {}, f"Skipping {mime_type}"

logger.info(f"Downloading file for content extraction: {file_name} ({mime_type})")

drive_metadata: dict[str, Any] = {
"google_drive_file_id": file_id,
"google_drive_file_name": file_name,
"google_drive_mime_type": mime_type,
"source_connector": "google_drive",
}
if "modifiedTime" in file:
drive_metadata["modified_time"] = file["modifiedTime"]
if "createdTime" in file:
drive_metadata["created_time"] = file["createdTime"]
if "size" in file:
drive_metadata["file_size"] = file["size"]
if "webViewLink" in file:
drive_metadata["web_view_link"] = file["webViewLink"]
if "md5Checksum" in file:
drive_metadata["md5_checksum"] = file["md5Checksum"]
if is_google_workspace_file(mime_type):
drive_metadata["exported_as"] = "pdf"
drive_metadata["original_workspace_type"] = mime_type.split(".")[-1]

temp_file_path = None
try:
if is_google_workspace_file(mime_type):
# Workspace files (Docs/Sheets/Slides) use export -- returns bytes
# in one shot. These are typically small (a few MB as PDF/text).
export_mime = get_export_mime_type(mime_type)
if not export_mime:
return None, drive_metadata, f"Cannot export Google Workspace type: {mime_type}"
content_bytes, error = await client.export_google_file(file_id, export_mime)
if error:
return None, drive_metadata, error
extension = ".pdf" if export_mime == "application/pdf" else ".txt"

with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp:
tmp.write(content_bytes)
temp_file_path = tmp.name
else:
# Binary files -- stream directly to disk in chunks to avoid
# loading the entire file into memory.
extension = Path(file_name).suffix or ".bin"
with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp:
temp_file_path = tmp.name

error = await client.download_file_to_disk(file_id, temp_file_path)
if error:
return None, drive_metadata, error

markdown = await _parse_file_to_markdown(temp_file_path, file_name)
return markdown, drive_metadata, None

except Exception as e:
logger.warning(f"Failed to extract content from {file_name}: {e!s}")
return None, drive_metadata, str(e)
finally:
if temp_file_path and os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
except Exception:
pass


async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
"""Parse a local file to markdown using the configured ETL service."""
lower = filename.lower()

if lower.endswith((".md", ".markdown", ".txt")):
with open(file_path, encoding="utf-8") as f:
return f.read()

if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")):
from app.config import config as app_config
from litellm import atranscription

stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
result = stt_service.transcribe_file(file_path)
text = result.get("text", "")
else:
with open(file_path, "rb") as audio_file:
kwargs: dict[str, Any] = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
resp = await atranscription(**kwargs)
text = resp.get("text", "")

if not text:
raise ValueError("Transcription returned empty text")
return f"# Transcription of {filename}\n\n{text}"

# Document files -- use configured ETL service
from app.config import config as app_config

if app_config.ETL_SERVICE == "UNSTRUCTURED":
from langchain_unstructured import UnstructuredLoader
from app.utils.document_converters import convert_document_to_markdown

loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
return await convert_document_to_markdown(docs)

if app_config.ETL_SERVICE == "LLAMACLOUD":
from app.tasks.document_processors.file_processors import (
parse_with_llamacloud_retry,
)

result = await parse_with_llamacloud_retry(file_path=file_path, estimated_pages=50)
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
if not markdown_documents:
raise RuntimeError(f"LlamaCloud returned no documents for {filename}")
return markdown_documents[0].text

if app_config.ETL_SERVICE == "DOCLING":
from docling.document_converter import DocumentConverter

converter = DocumentConverter()
result = converter.convert(file_path)
return result.document.export_to_markdown()

raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")


async def download_and_process_file(
client: GoogleDriveClient,
file: dict[str, Any],
Expand Down
11 changes: 9 additions & 2 deletions surfsense_backend/app/indexing_pipeline/document_hashing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@
from app.indexing_pipeline.connector_document import ConnectorDocument


def compute_identifier_hash(
document_type_value: str, unique_id: str, search_space_id: int
) -> str:
"""Return a stable SHA-256 hash from raw identity components."""
combined = f"{document_type_value}:{unique_id}:{search_space_id}"
return hashlib.sha256(combined.encode("utf-8")).hexdigest()


def compute_unique_identifier_hash(doc: ConnectorDocument) -> str:
"""Return a stable SHA-256 hash identifying a document by its source identity."""
combined = f"{doc.document_type.value}:{doc.unique_id}:{doc.search_space_id}"
return hashlib.sha256(combined.encode("utf-8")).hexdigest()
return compute_identifier_hash(doc.document_type.value, doc.unique_id, doc.search_space_id)


def compute_content_hash(doc: ConnectorDocument) -> str:
Expand Down
Loading