|
20 | 20 | from core.models.auth import AuthContext |
21 | 21 | from core.models.chunk import Chunk |
22 | 22 | from core.models.documents import Document |
| 23 | +from core.models.folders import Folder |
23 | 24 | from core.parser.docling_v2 import DoclingV2Parser |
24 | 25 | from core.parser.morphik_parser import MorphikParser |
25 | 26 | from core.storage.base_storage import BaseStorage |
26 | 27 | from core.storage.utils_file_extensions import detect_content_type |
27 | | -from core.utils.folder_utils import normalize_ingest_folder_inputs |
| 28 | +from core.utils.folder_utils import normalize_folder_path, normalize_ingest_folder_inputs |
28 | 29 | from core.utils.typed_metadata import MetadataBundle, normalize_metadata |
29 | 30 | from core.vector_store.chunk_v2_store import ChunkV2Store |
30 | 31 |
|
@@ -223,6 +224,81 @@ def _reset_processing_metadata(system_metadata: Optional[Dict[str, Any]]) -> Dic |
223 | 224 | cleaned_metadata["updated_at"] = datetime.now(UTC) |
224 | 225 | return cleaned_metadata |
225 | 226 |
|
| 227 | + @staticmethod |
| 228 | + def _folder_update_fields(folder_obj: Folder) -> Dict[str, Any]: |
| 229 | + """Build a consistent update payload for folder metadata columns.""" |
| 230 | + try: |
| 231 | + path_value = folder_obj.full_path or (normalize_folder_path(folder_obj.name) if folder_obj.name else None) |
| 232 | + except ValueError: |
| 233 | + path_value = folder_obj.name |
| 234 | + |
| 235 | + return { |
| 236 | + "folder_id": folder_obj.id, |
| 237 | + "folder_path": path_value, |
| 238 | + "folder_name": folder_obj.name, |
| 239 | + } |
| 240 | + |
| 241 | + async def _ensure_folder_exists(self, folder_path: str, document_id: str, auth: AuthContext) -> Optional[Folder]: |
| 242 | + """ |
| 243 | + Ensure a folder path exists (creating ancestors as needed) and add the document to the leaf. |
| 244 | + """ |
| 245 | + try: |
| 246 | + canonical_path = normalize_folder_path(folder_path) |
| 247 | + segments = canonical_path.strip("/").split("/") if canonical_path and canonical_path != "/" else [] |
| 248 | + |
| 249 | + if canonical_path == "/": |
| 250 | + logger.error("Cannot ingest into root folder '/'") |
| 251 | + raise ValueError("Cannot ingest into root folder '/'") |
| 252 | + |
| 253 | + parent_id: Optional[str] = None |
| 254 | + current_path_parts: List[str] = [] |
| 255 | + target_folder: Optional[Folder] = None |
| 256 | + |
| 257 | + for idx, segment in enumerate(segments): |
| 258 | + current_path_parts.append(segment) |
| 259 | + current_path = "/" + "/".join(current_path_parts) |
| 260 | + existing = await self.db.get_folder_by_full_path(current_path, auth) |
| 261 | + if existing: |
| 262 | + parent_id = existing.id |
| 263 | + if idx == len(segments) - 1: |
| 264 | + target_folder = existing |
| 265 | + continue |
| 266 | + |
| 267 | + folder_depth = idx + 1 |
| 268 | + folder = Folder( |
| 269 | + name=segment, |
| 270 | + full_path=current_path, |
| 271 | + parent_id=parent_id, |
| 272 | + depth=folder_depth, |
| 273 | + document_ids=[document_id] if idx == len(segments) - 1 else [], |
| 274 | + app_id=auth.app_id, |
| 275 | + ) |
| 276 | + await self.db.create_folder(folder, auth) |
| 277 | + parent_id = folder.id |
| 278 | + if idx == len(segments) - 1: |
| 279 | + target_folder = folder |
| 280 | + |
| 281 | + if target_folder is None: |
| 282 | + logger.error("Failed to ensure target folder for path %s", canonical_path) |
| 283 | + return None |
| 284 | + |
| 285 | + if document_id not in (target_folder.document_ids or []): |
| 286 | + success = await self.db.add_document_to_folder(target_folder.id, document_id, auth) |
| 287 | + if not success: |
| 288 | + logger.warning( |
| 289 | + "Failed to add document %s to folder %s. This may be due to a race condition.", |
| 290 | + document_id, |
| 291 | + target_folder.name, |
| 292 | + ) |
| 293 | + else: |
| 294 | + logger.info("Successfully added document %s to folder %s", document_id, target_folder.name) |
| 295 | + |
| 296 | + return target_folder |
| 297 | + |
| 298 | + except Exception as exc: # noqa: BLE001 |
| 299 | + logger.error("Error ensuring folder exists: %s", exc) |
| 300 | + return None |
| 301 | + |
226 | 302 | @classmethod |
227 | 303 | def _build_ingestion_job_payload( |
228 | 304 | cls, |
@@ -589,6 +665,22 @@ async def ingest_document( |
589 | 665 | await self._mark_document_failed(doc, auth, f"Storage upload failed: {exc}") |
590 | 666 | raise HTTPException(status_code=500, detail=f"Failed to upload file to storage: {exc}") from exc |
591 | 667 |
|
| 668 | + if folder_path_value: |
| 669 | + try: |
| 670 | + folder_obj = await self._ensure_folder_exists(folder_path_value, doc.external_id, auth) |
| 671 | + if folder_obj and folder_obj.id: |
| 672 | + doc.folder_id = folder_obj.id |
| 673 | + folder_updates = self._folder_update_fields(folder_obj) |
| 674 | + await self.db.update_document(doc.external_id, folder_updates, auth=auth) |
| 675 | + logger.debug("Ensured folder '%s' exists and contains document %s", folder_path_value, doc.external_id) |
| 676 | + except Exception as exc: # noqa: BLE001 |
| 677 | + logger.error( |
| 678 | + "Error ensuring folder exists for doc %s (path=%s): %s", |
| 679 | + doc.external_id, |
| 680 | + folder_path_value, |
| 681 | + exc, |
| 682 | + ) |
| 683 | + |
592 | 684 | if queue is None: |
593 | 685 | queue = redis is not None |
594 | 686 |
|
|
0 commit comments