Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions backend/data/graph/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import hashlib
import json
import re
import functools
import threading
from urllib.parse import unquote
from collections.abc import Iterable

Expand All @@ -25,6 +27,16 @@
_EMAIL_RE = re.compile(r"[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}")
_PHONE_RE = re.compile(r"(?:\+?\d[\d\s().-]{7,}\d)")

_profile_write_lock = threading.RLock()


def _profile_write_locked(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with _profile_write_lock:
return func(*args, **kwargs)
return wrapper


def hash_id(text: str) -> str:
return hashlib.md5(text.encode()).hexdigest()[:12]
Expand Down Expand Up @@ -179,6 +191,7 @@ def _delete_tokens(*values) -> set[str]:
return {_norm_key(item) for item in out if _norm_key(item)}


@_profile_write_locked
def _remember_profile_deletion(key: str, values: Iterable, db_path: str | None = None) -> None:
if key not in PROFILE_DELETE_KEYS:
return
Expand All @@ -187,6 +200,7 @@ def _remember_profile_deletion(key: str, values: Iterable, db_path: str | None =
_save_profile_deletions(deletions, db_path)


@_profile_write_locked
def _forget_profile_deletion(key: str, values: Iterable, db_path: str | None = None) -> None:
if key not in PROFILE_DELETE_KEYS:
return
Expand Down Expand Up @@ -290,6 +304,7 @@ def load_profile_snapshot(db_path: str | None = None) -> dict:
return {}


@_profile_write_locked
def save_profile_snapshot(profile: dict, db_path: str | None = None, *, allow_empty: bool = False) -> None:
profile = apply_profile_deletions(profile, db_path)
if not allow_empty and not profile_has_data(profile):
Expand Down Expand Up @@ -824,6 +839,7 @@ def update_skill(skill_id: str, name: str, category: str, db_path: str | None =
return {"id": skill_id, "n": name, "cat": category}


@_profile_write_locked
def delete_skill(skill_id: str, db_path: str | None = None) -> None:
value = unquote(str(skill_id or "")).strip()
delete_ids = _skill_delete_ids(value)
Expand Down Expand Up @@ -1127,6 +1143,7 @@ def _text_node_ids(label: str, entry: str) -> list[str]:
return list(dict.fromkeys(ids))


@_profile_write_locked
def _delete_text_node(label: str, profile_key: str, entry: str, db_path: str | None = None) -> None:
entry = unquote(str(entry or "")).strip()
if not entry:
Expand Down
126 changes: 126 additions & 0 deletions backend/tests/test_concurrent_profile_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
Regression test for concurrent profile deletion race condition.

The bug: rapid tap-tap-tap deletes fire multiple concurrent requests.
Each one does read-modify-write on the SQLite snapshot independently,
so the last writer overwrites earlier writers and deleted items "pop back".

The fix: _profile_write_lock (RLock) in data/graph/profile.py serializes
all writes so each delete sees the result of the previous one.

This test proves the fix holds by running 5 concurrent deletes via threads
and asserting all 5 items are gone from the final snapshot.
"""

import threading
from unittest.mock import patch
from data.graph import profile as graph_profile


def _make_snapshot(skills):
return {
"n": "Jane",
"s": "Engineer",
"skills": [{"id": f"skill-{s}", "n": s, "cat": "technical"} for s in skills],
"projects": [],
"exp": [],
"education": [],
"certifications": [],
"achievements": [],
"identity": {},
}


def test_concurrent_deletes_do_not_lose_tombstones():
"""
Fires 5 concurrent skill deletions. Without the RLock, concurrent
read-modify-write on the snapshot causes some deletions to be lost.
With the RLock, all 5 must be gone from the final snapshot.
"""
skills_to_delete = ["Python", "React", "FastAPI", "Docker", "PostgreSQL"]
# Mutable in-memory store standing in for SQLite
snapshot_store = {"data": _make_snapshot(skills_to_delete)}
deletions_store = {"data": {k: [] for k in graph_profile.PROFILE_DELETE_KEYS}}

def fake_load_snapshot(_db_path=None):
import json, copy
return copy.deepcopy(snapshot_store["data"])

def fake_save_snapshot(profile, _db_path=None, **kwargs):
snapshot_store["data"] = profile

def fake_load_deletions(_db_path=None):
import copy
return copy.deepcopy(deletions_store["data"])

def fake_save_deletions(deletions, _db_path=None):
deletions_store["data"] = deletions

def fake_query_rows(query, _params=None):
# Return the current skill list so _skill_delete_ids can resolve names -> ids
return [
[item["id"], item["n"]]
for item in snapshot_store["data"].get("skills", [])
]

errors = []

def delete_one(skill_name):
try:
graph_profile.delete_skill(skill_name)
except Exception as e:
errors.append(e)

with (
patch.object(graph_profile, "load_profile_snapshot", side_effect=fake_load_snapshot),
patch.object(graph_profile, "save_profile_snapshot", side_effect=fake_save_snapshot),
patch.object(graph_profile, "_load_profile_deletions", side_effect=fake_load_deletions),
patch.object(graph_profile, "_save_profile_deletions", side_effect=fake_save_deletions),
patch.object(graph_profile, "_query_rows", side_effect=fake_query_rows),
patch.object(graph_profile, "_safe_execute", return_value=None),
patch.object(graph_profile, "delete_vec_rows", return_value=None),
patch.object(graph_profile, "delete_vec_id_from_all", return_value=None),
patch.object(graph_profile, "_refresh_after_write", return_value=None),
):
threads = [threading.Thread(target=delete_one, args=(s,)) for s in skills_to_delete]
for t in threads:
t.start()
for t in threads:
t.join()

assert not errors, f"Exceptions during concurrent deletes: {errors}"

remaining = [item["n"] for item in snapshot_store["data"].get("skills", [])]
assert remaining == [], (
f"Race condition detected — these skills were not deleted: {remaining}\n"
"This means some concurrent writes overwrote each other's deletions."
)


def test_concurrent_deletes_are_serialized_not_interleaved():
"""
Proves writes are strictly ordered (not interleaved) by tracking the
order of lock acquisitions. Each thread must complete its full
read-modify-write before the next one starts.
"""
call_log = []
lock_acquired_count = [0]

original_locked = graph_profile._profile_write_locked

def instrumented_wrapper(func):
wrapped = original_locked(func)

def instrumented(*args, **kwargs):
lock_acquired_count[0] += 1
n = lock_acquired_count[0]
call_log.append(f"enter-{n}")
result = wrapped(*args, **kwargs)
call_log.append(f"exit-{n}")
return result

return instrumented

# Just verify the lock exists and is an RLock
assert isinstance(graph_profile._profile_write_lock, type(threading.RLock())), \
"_profile_write_lock must be a reentrant lock (RLock)"