Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Current (in progress)

- Change CSW harvesters XML processor from lxml to saxonche [#3380](https://github.com/opendatateam/udata/pull/3380)
- **breaking change** feat: topic elements [#3318](https://github.com/opendatateam/udata/pull/3318) [#3416](https://github.com/opendatateam/udata/pull/3416) [#3417](https://github.com/opendatateam/udata/pull/3417) [#3418](https://github.com/opendatateam/udata/pull/3418)
- **breaking change** feat: topic elements [#3318](https://github.com/opendatateam/udata/pull/3318) [#3416](https://github.com/opendatateam/udata/pull/3416) [#3417](https://github.com/opendatateam/udata/pull/3417) [#3418](https://github.com/opendatateam/udata/pull/3418) [#3423](https://github.com/opendatateam/udata/pull/3423)
- Allow to send arguments to the `udata user create` command for CI [#3419](https://github.com/opendatateam/udata/pull/3419)

## 10.9.0 (2025-08-28)
Expand Down
23 changes: 22 additions & 1 deletion udata/core/topic/apiv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import mongoengine
from flask import request
from flask_security import current_user
from mongoengine.signals import post_delete

from udata.api import API, api, apiv2
from udata.core.discussions.models import Discussion
Expand All @@ -17,10 +18,13 @@
from udata.core.topic.models import Topic, TopicElement
from udata.core.topic.parsers import TopicApiParser, TopicElementsParser
from udata.core.topic.permissions import TopicEditPermission
from udata.search import batch_reindex
from udata.tasks import as_task_param

apiv2.inherit("ModelReference", api.model_reference)

DEFAULT_SORTING = "-created_at"
DELETE_REINDEX_BATCH_SIZE = 500

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -157,7 +161,24 @@ def delete(self, topic):
if not TopicEditPermission(topic).can():
apiv2.abort(403, "Forbidden")

topic.elements.delete()
# Collect element references for batch reindexing before deletion
elements_to_reindex = [
as_task_param(element.element)
for element in TopicElement.objects(topic=topic).only("element")
if element.element and hasattr(element.element, "id")
]

# Temporarily disconnect post_delete signal to avoid individual reindex tasks
post_delete.disconnect(TopicElement.post_delete, sender=TopicElement)
try:
TopicElement.objects(topic=topic).delete()
# Process reindexing in batches
for i in range(0, len(elements_to_reindex), DELETE_REINDEX_BATCH_SIZE):
batch = elements_to_reindex[i : i + DELETE_REINDEX_BATCH_SIZE]
batch_reindex.delay(batch)
finally:
# Always reconnect the signal
post_delete.connect(TopicElement.post_delete, sender=TopicElement)

Comment on lines +171 to 182
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of this way of doing. As I understand the Python server, the global state is shared between client? So what happen if someone delete an element from another topic during the reindex of this one? Does the signal triggers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, true, it's process-wide AFAIK. This could create some unwanted side effects.

return None, 204

Expand Down
5 changes: 4 additions & 1 deletion udata/core/topic/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ def post_save(cls, sender, document, **kwargs):

@classmethod
def post_delete(cls, sender, document, **kwargs):
"""Trigger reindex when element is deleted"""
"""
Trigger reindex when element is deleted
⚠️ This logic is also implemented in batch in `topic_elements_delete`.
"""
if document.topic and document.element and hasattr(document.element, "id"):
reindex.delay(*as_task_param(document.element))

Expand Down
14 changes: 14 additions & 0 deletions udata/search/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ def unindex(classname, id):
log.exception('Unable to unindex %s "%s"', model.__name__, id)


@task(route="high.search")
def batch_reindex(document_refs: tuple[str, str]):
"""Batch reindex documents"""
if not current_app.config["SEARCH_SERVICE_API_URL"]:
return

log.info("Batch reindexing %d documents", len(document_refs))
for classname, document_id in document_refs:
try:
reindex(classname, document_id)
except Exception:
log.exception('Unable to reindex %s "%s"', classname, document_id)


def reindex_model_on_save(sender, document, **kwargs):
"""(Re/Un)Index Mongo document on post_save"""
if current_app.config.get("AUTO_INDEX") and current_app.config["SEARCH_SERVICE_API_URL"]:
Expand Down
39 changes: 39 additions & 0 deletions udata/tests/apiv2/test_topics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from unittest.mock import patch

import pytest
from flask import url_for

Expand Down Expand Up @@ -555,6 +557,43 @@ def test_clear_elements(self):
topic.reload()
self.assertEqual(len(topic.elements), 0)

def test_clear_elements_batch_reindex(self):
"""It should batch reindex elements when clearing a topic to prevent Redis exhaustion"""
owner = self.login()
topic = TopicFactory(owner=owner)

# Create 7 elements with dataset references
datasets = [DatasetFactory() for _ in range(7)]
for dataset in datasets:
TopicElementDatasetFactory(topic=topic, element=dataset)

self.assertEqual(len(topic.elements), 7)

# Patch batch size to 3 for testing
with (
patch("udata.core.topic.apiv2.DELETE_REINDEX_BATCH_SIZE", 3),
patch("udata.core.topic.apiv2.batch_reindex") as mock_batch_reindex,
patch("udata.search.reindex") as mock_reindex,
):
response = self.delete(url_for("apiv2.topic_elements", topic=topic))
self.assert204(response)

# Verify elements were deleted
topic.reload()
self.assertEqual(len(topic.elements), 0)

# Verify signal disconnection worked - individual reindex.delay should NOT be called
self.assertEqual(mock_reindex.delay.call_count, 0)

# Verify batch_reindex.delay was called 3 times: 3 + 3 + 1 elements
self.assertEqual(mock_batch_reindex.delay.call_count, 3)

# Verify batch sizes
call_args = [call[0][0] for call in mock_batch_reindex.delay.call_args_list]
self.assertEqual(len(call_args[0]), 3) # First batch
self.assertEqual(len(call_args[1]), 3) # Second batch
self.assertEqual(len(call_args[2]), 1) # Third batch


class TopicElementAPITest(APITestCase):
def test_delete_element(self):
Expand Down