Skip to content

Commit 68340ee

Browse files
committed
feat(sdk): Use the in-memory and in-store deduplicator.
This patch replaces `ThreadEventCacheStateLockWriteGuard::filter_duplicate_events` by `deduplicator::filter_duplicate_events`. First off, this function is shared between multiple event caches. Second, it deduplicates events regarding in-memory and in-store state. The calls to `ThreadEventCacheStateLockWriteGuard::remove_events` have been updated to pass the duplicated in-store events.
1 parent aad7a1b commit 68340ee

File tree

4 files changed

+55
-70
lines changed

4 files changed

+55
-70
lines changed

crates/matrix-sdk/src/event_cache/caches/room/state.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,6 +1010,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
10101010
let RoomEventCacheState {
10111011
room_id,
10121012
weak_room,
1013+
own_user_id,
10131014
store,
10141015
linked_chunk_update_sender,
10151016
threads,
@@ -1021,6 +1022,7 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
10211022
let thread_event_cache = ThreadEventCache::new(
10221023
room_id.clone(),
10231024
root_event_id,
1025+
own_user_id.clone(),
10241026
weak_room.clone(),
10251027
store.clone(),
10261028
linked_chunk_update_sender.clone(),

crates/matrix-sdk/src/event_cache/caches/thread/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ mod state;
2020
use std::{fmt, sync::Arc};
2121

2222
use matrix_sdk_base::event_cache::{Event, store::EventCacheStoreLock};
23-
use ruma::{EventId, OwnedEventId, OwnedRoomId};
23+
use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId};
2424
pub(super) use state::LockedThreadEventCacheState;
2525
use tokio::sync::broadcast::{Receiver, Sender};
2626
use tracing::error;
@@ -59,6 +59,7 @@ impl ThreadEventCache {
5959
pub async fn new(
6060
room_id: OwnedRoomId,
6161
thread_id: OwnedEventId,
62+
own_user_id: OwnedUserId,
6263
weak_room: WeakRoom,
6364
store: EventCacheStoreLock,
6465
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
@@ -70,6 +71,7 @@ impl ThreadEventCache {
7071
state: LockedThreadEventCacheState::new(
7172
room_id,
7273
thread_id,
74+
own_user_id,
7375
store,
7476
linked_chunk_update_sender,
7577
)

crates/matrix-sdk/src/event_cache/caches/thread/pagination.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@ use eyeball::SharedObservable;
1818
use eyeball_im::VectorDiff;
1919
use matrix_sdk_base::{
2020
event_cache::{Event, Gap},
21-
linked_chunk::ChunkContent,
21+
linked_chunk::{ChunkContent, LinkedChunkId},
2222
};
2323
use ruma::api::Direction;
2424
use tracing::trace;
2525

2626
pub use super::super::pagination::PaginationStatus;
2727
use super::{
28-
super::super::{
29-
EventCacheError, EventsOrigin, Result, TimelineVectorDiffs,
30-
caches::pagination::{
28+
super::{
29+
super::{
30+
EventCacheError, EventsOrigin, Result, TimelineVectorDiffs,
31+
deduplicator::{DeduplicationOutcome, filter_duplicate_events},
32+
},
33+
pagination::{
3134
BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
3235
},
3336
},
@@ -226,21 +229,31 @@ impl PaginatedCache for ThreadEventCacheWrapper {
226229
let topo_ordered_events = events.iter().cloned().rev().collect::<Vec<_>>();
227230
let new_gap = new_token.map(|token| Gap { token });
228231

229-
let deduplication = state.filter_duplicate_events(topo_ordered_events);
230-
231-
let (events, new_gap) = if deduplication.non_empty_all_duplicates {
232+
let DeduplicationOutcome {
233+
all_events: events,
234+
in_memory_duplicated_event_ids,
235+
in_store_duplicated_event_ids,
236+
non_empty_all_duplicates: all_duplicates,
237+
} = filter_duplicate_events(
238+
&state.state.own_user_id,
239+
&state.store,
240+
LinkedChunkId::Thread(&state.state.room_id, &state.state.thread_id),
241+
state.thread_linked_chunk(),
242+
topo_ordered_events,
243+
)
244+
.await?;
245+
246+
let (events, new_gap) = if all_duplicates {
232247
// If all events are duplicates, we don't need to do anything; ignore
233248
// the new events and the new gap.
234249
(Vec::new(), None)
235250
} else {
236-
assert!(
237-
deduplication.in_store_duplicated_event_ids.is_empty(),
238-
"persistent storage for threads is not implemented yet"
239-
);
240-
state.remove_events(deduplication.in_memory_duplicated_event_ids, vec![]).await?;
251+
state
252+
.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
253+
.await?;
241254

242255
// Keep events and the gap.
243-
(deduplication.all_events, new_gap)
256+
(events, new_gap)
244257
};
245258

246259
// Add the paginated events to the thread chunk.

crates/matrix-sdk/src/event_cache/caches/thread/state.rs

Lines changed: 24 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::BTreeSet;
16-
1715
use eyeball_im::VectorDiff;
1816
use matrix_sdk_base::{
1917
event_cache::{
@@ -25,14 +23,14 @@ use matrix_sdk_base::{
2523
},
2624
};
2725
use matrix_sdk_common::executor::spawn;
28-
use ruma::{OwnedEventId, OwnedRoomId};
26+
use ruma::{OwnedEventId, OwnedRoomId, OwnedUserId};
2927
use tokio::sync::broadcast::Sender;
3028
use tracing::{debug, error, instrument};
3129

3230
use super::super::{
3331
super::{
3432
EventCacheError, EventsOrigin, Result,
35-
deduplicator::DeduplicationOutcome,
33+
deduplicator::{DeduplicationOutcome, filter_duplicate_events},
3634
persistence::{load_linked_chunk_metadata, send_updates_to_store},
3735
},
3836
TimelineVectorDiffs,
@@ -43,11 +41,14 @@ use super::super::{
4341

4442
pub struct ThreadEventCacheState {
4543
/// The room owning this thread.
46-
room_id: OwnedRoomId,
44+
pub room_id: OwnedRoomId,
4745

4846
/// The ID of the thread root event, which is the first event in the thread
4947
/// (and eventually the first in the linked chunk).
50-
thread_id: OwnedEventId,
48+
pub thread_id: OwnedEventId,
49+
50+
/// The user's own user id.
51+
pub own_user_id: OwnedUserId,
5152

5253
/// Reference to the underlying backing store.
5354
store: EventCacheStoreLock,
@@ -97,6 +98,7 @@ impl LockedThreadEventCacheState {
9798
pub async fn new(
9899
room_id: OwnedRoomId,
99100
thread_id: OwnedEventId,
101+
own_user_id: OwnedUserId,
100102
store: EventCacheStoreLock,
101103
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
102104
) -> Result<Self> {
@@ -159,6 +161,7 @@ impl LockedThreadEventCacheState {
159161
Ok(Self::new_inner(ThreadEventCacheState {
160162
room_id,
161163
thread_id,
164+
own_user_id,
162165
store,
163166
thread_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
164167
linked_chunk,
@@ -276,22 +279,28 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
276279
}
277280

278281
pub async fn handle_sync(&mut self, events: Vec<Event>) -> Result<Vec<VectorDiff<Event>>> {
279-
let deduplication = self.filter_duplicate_events(events);
282+
let DeduplicationOutcome {
283+
all_events: events,
284+
in_memory_duplicated_event_ids,
285+
in_store_duplicated_event_ids,
286+
non_empty_all_duplicates: all_duplicates,
287+
} = filter_duplicate_events(
288+
&self.state.own_user_id,
289+
&self.store,
290+
LinkedChunkId::Thread(&self.state.room_id, &self.state.thread_id),
291+
&self.state.thread_linked_chunk,
292+
events,
293+
)
294+
.await?;
280295

281-
if deduplication.non_empty_all_duplicates {
296+
if all_duplicates {
282297
// If all events are duplicates, we don't need to do anything; ignore
283298
// the new events.
284299
return Ok(Vec::new());
285300
}
286301

287302
// Remove the duplicated events from the thread chunk.
288-
self.remove_events(deduplication.in_memory_duplicated_event_ids, vec![]).await?;
289-
assert!(
290-
deduplication.in_store_duplicated_event_ids.is_empty(),
291-
"persistent storage for threads is not implemented yet"
292-
);
293-
294-
let events = deduplication.all_events;
303+
self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?;
295304

296305
self.state.thread_linked_chunk.push_live_events(None, &events);
297306

@@ -425,45 +434,4 @@ impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
425434
)
426435
.await
427436
}
428-
429-
/// Find duplicates in a thread, until there's persistent storage for
430-
/// those.
431-
///
432-
/// TODO: when persistent storage is implemented for thread, only use
433-
/// the regular `filter_duplicate_events` method.
434-
pub fn filter_duplicate_events(&self, mut new_events: Vec<Event>) -> DeduplicationOutcome {
435-
let mut new_event_ids = BTreeSet::new();
436-
437-
new_events.retain(|event| {
438-
// Only keep events with IDs, and those for which `insert` returns `true`
439-
// (meaning they were not in the set).
440-
event.event_id().is_some_and(|event_id| new_event_ids.insert(event_id))
441-
});
442-
443-
let in_memory_duplicated_event_ids: Vec<_> = self
444-
.state
445-
.thread_linked_chunk
446-
.events()
447-
.filter_map(|(position, event)| {
448-
let event_id = event.event_id()?;
449-
new_event_ids.contains(&event_id).then_some((event_id, position))
450-
})
451-
.collect();
452-
453-
// Right now, there's no persistent storage for threads.
454-
let in_store_duplicated_event_ids = Vec::new();
455-
456-
let at_least_one_event = !new_events.is_empty();
457-
let all_duplicates = (in_memory_duplicated_event_ids.len()
458-
+ in_store_duplicated_event_ids.len())
459-
== new_events.len();
460-
let non_empty_all_duplicates = at_least_one_event && all_duplicates;
461-
462-
DeduplicationOutcome {
463-
all_events: new_events,
464-
in_memory_duplicated_event_ids,
465-
in_store_duplicated_event_ids,
466-
non_empty_all_duplicates,
467-
}
468-
}
469437
}

0 commit comments

Comments
 (0)