Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions tests/test_search_provider.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# mypy: ignore-errors
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch

import pytest

from yente import settings
from yente.data.entity import Entity
from yente.exc import YenteIndexError, YenteNotFoundError
from yente.provider import SearchProvider
from yente.provider.elastic import ElasticSearchProvider
from yente.provider.opensearch import OpenSearchProvider
from yente.search.indexer import build_indexable_entity_doc
from yente.search.mapping import INDEX_SETTINGS, make_entity_mapping

# Constants for testing
Expand Down Expand Up @@ -153,10 +156,64 @@ async def test_clone_index_failure_restores_read_only(search_provider: SearchPro
index_settings = resp[source]["settings"]["index"]
blocks = index_settings.get("blocks", {})
read_only = blocks.get("read_only", "false")
assert (
str(read_only).lower() != "true"
), f"Source index is still read-only after failed clone: {read_only}"
assert str(read_only).lower() != "true", (
f"Source index is still read-only after failed clone: {read_only}"
)

# Clean up
await search_provider.delete_index(source)
await search_provider.delete_index(target)


@pytest.mark.asyncio
async def test_get_index_max_date(search_provider: SearchProvider):
temp_index = settings.ENTITY_INDEX + "-max-date-test"
await search_provider.create_index(
temp_index, mappings=TEST_MAPPINGS, settings=TEST_SETTINGS
)

# An empty index should return None
await search_provider.refresh(temp_index)
assert await search_provider.get_index_max_date(temp_index, "last_seen") is None

older_last_seen = "2024-01-15T08:00:00"
newer_last_seen = "2024-06-01T10:57:03"

entities = [
Entity.from_dict(
{
"id": "test-max-date-1",
"schema": "Person",
"properties": {"name": ["Alice Test"]},
"datasets": ["test"],
"first_seen": "2024-01-01T00:00:00",
"last_seen": older_last_seen,
"last_change": older_last_seen,
}
),
Entity.from_dict(
{
"id": "test-max-date-2",
"schema": "Person",
"properties": {"name": ["Bob Test"]},
"datasets": ["test"],
"first_seen": "2024-01-01T00:00:00",
"last_seen": newer_last_seen,
"last_change": newer_last_seen,
}
),
]
actions = [
{"_index": temp_index, "_id": e.id, "_source": build_indexable_entity_doc(e)}
for e in entities
]
await search_provider.bulk_index(actions)
await search_provider.refresh(temp_index)

result = await search_provider.get_index_max_date(temp_index, "last_seen")
expected = int(
datetime.fromisoformat(newer_last_seen).replace(tzinfo=timezone.utc).timestamp()
)
assert result == expected

await search_provider.delete_index(temp_index)
37 changes: 37 additions & 0 deletions yente/data/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from datetime import datetime, timezone

from opentelemetry import metrics

from yente.logs import get_logger
from yente.provider.base import SearchProvider

log = get_logger(__name__)

_meter = metrics.get_meter("yente.data")
# Ideally we'd be reading dataset last_export from the index, but once a new index
# has been loaded, that information is no longer available to us — so we derive it
# by max-querying last_seen across all documents in the index.
_indexed_dataset_version_time = _meter.create_gauge(
"indexed_dataset_version_time",
unit="s",
description="Unix timestamp of the most recent last_seen value for each indexed dataset",
)


async def update_dataset_version_metric(
dataset_name: str,
index: str,
provider: SearchProvider,
) -> None:
timestamp_s = await provider.get_index_max_date(index, "last_seen")
if timestamp_s is None:
log.warning(
"No last_seen value found in index", dataset=dataset_name, index=index
)
return
_indexed_dataset_version_time.set(timestamp_s, {"dataset": dataset_name})
log.info(
"Updated dataset version metric",
dataset=dataset_name,
timestamp=datetime.fromtimestamp(timestamp_s, tz=timezone.utc).isoformat(),
)
15 changes: 15 additions & 0 deletions yente/provider/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@


class SearchProvider(object):
async def get_index_max_date(self, index: str, field: str) -> Optional[int]:
"""Return the max value of a date field across all documents in the index,
as a Unix timestamp in seconds. Returns None if the index is empty."""
response = await self.search(
index=index,
query={"match_all": {}},
size=0,
aggregations={"max_date": {"max": {"field": field}}},
)
value_ms: Optional[float] = (
response.get("aggregations", {}).get("max_date", {}).get("value")
)
if value_ms is None:
return None
return int(value_ms / 1000.0)

async def close(self) -> None:
raise NotImplementedError
Expand Down
61 changes: 37 additions & 24 deletions yente/search/indexer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import threading
from typing import Any, AsyncGenerator, AsyncIterable, Dict, List, Set
from typing import Any, AsyncGenerator, AsyncIterable, Dict, List, Optional, Set
from followthemoney import registry
from followthemoney.exc import FollowTheMoneyException
from followthemoney.names import entity_names
Expand Down Expand Up @@ -37,12 +37,44 @@
get_system_version,
)
from yente.data.util import entity_weak_names, expand_dates, index_symbols
from yente.data.metrics import update_dataset_version_metric


log = get_logger(__name__)
lock = threading.Lock()


def build_entity_action(
index: str,
op_code: str,
entity_data: Dict[str, Any],
datasets: Set[str],
dataset: Dataset,
) -> Optional[Dict[str, Any]]:
"""Build a single bulk action dict from a raw entity op, or None on error."""
if op_code == "DEL":
return {
"_op_type": "delete",
"_index": index,
"_id": entity_data["id"],
}
try:
entity = Entity.from_dict(entity_data)
entity.datasets = entity.datasets.intersection(datasets)
if not len(entity.datasets):
entity.datasets.add(dataset.name)
if dataset.ns is not None:
entity = dataset.ns.apply(entity)
return {
"_index": index,
"_id": entity.id,
"_source": build_indexable_entity_doc(entity),
}
except FollowTheMoneyException as exc:
log.error("Invalid entity: %s" % exc, data=entity_data)
return None


async def iter_entity_docs(
updater: DatasetUpdater, index: str
) -> AsyncGenerator[Dict[str, Any], None]:
Expand All @@ -56,29 +88,9 @@ async def iter_entity_docs(
op_code = data["op"]
idx += 1
ops[op_code] += 1
if op_code == "DEL":
yield {
"_op_type": "delete",
"_index": index,
"_id": data["entity"]["id"],
}
continue

try:
entity = Entity.from_dict(data["entity"])
entity.datasets = entity.datasets.intersection(datasets)
if not len(entity.datasets):
entity.datasets.add(dataset.name)
if dataset.ns is not None:
entity = dataset.ns.apply(entity)

yield {
"_index": index,
"_id": entity.id,
"_source": build_indexable_entity_doc(entity),
}
except FollowTheMoneyException as exc:
log.error("Invalid entity: %s" % exc, data=data)
action = build_entity_action(index, op_code, data["entity"], datasets, dataset)
if action is not None:
yield action
log.info(
"Indexed %d entities" % idx,
added=ops["ADD"],
Expand Down Expand Up @@ -266,6 +278,7 @@ async def refresh_lock_iterator(
message=f"Alias {alias} prefixed {dataset_prefix} now points to {next_index}",
)
log.info("Index is now aliased to: %s" % alias, index=next_index)
await update_dataset_version_metric(dataset.name, next_index, provider)


async def delete_old_indices(provider: SearchProvider, catalog: Catalog) -> None:
Expand Down
1 change: 1 addition & 0 deletions yente/search/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@


async def sync_dataset_versions(provider: SearchProvider, catalog: Catalog) -> None:
"""Sync dataset index_version for all datasets in the passed catalog."""
for aliased_index in await provider.get_alias_indices(settings.ENTITY_INDEX):
try:
index_info = parse_index_name(aliased_index)
Expand Down
Loading