Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 129 additions & 22 deletions core-relations/src/containers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ use std::{
any::{Any, TypeId},
hash::{Hash, Hasher},
ops::Deref,
sync::{Arc, Mutex},
};

use crate::numeric_id::{DenseIdMap, IdVec, NumericId, define_id};
use crossbeam_queue::SegQueue;
use dashmap::SharedValue;
use hashbrown::HashSet;
use rayon::{
iter::{ParallelBridge, ParallelIterator},
prelude::*,
Expand Down Expand Up @@ -201,7 +203,116 @@ struct ContainerEnv<C: Eq + Hash> {
to_id: DashMap<C, Value>,
to_container: DashMap<Value, (usize /* hash code */, usize /* map */)>,
/// Map from a Value to the set of ids of containers that contain that value.
val_index: DashMap<Value, IndexSet<Value>>,
val_index: LazyContainerIdx,
}
#[derive(Clone)]
struct LazyContainerIdx {
val_index: DashMap<Value, HashSet<Value>>,
// keys and value to insert
// if user want to insert same value for all keys in IndexSet<Value>, LazyMap will put them
// in pending_insert and do the insertion for single key and remove this key in pending_insert when user want to read LazyMap
pending_operations: Arc<Mutex<Vec<(HashSet<Value>, InsertOrRemove)>>>,
}
enum InsertOrRemove {
Insert(Value),
Remove(Value),
}

const LAZY_BOUND: usize = 30;
use dashmap::mapref::one::Ref;
impl LazyContainerIdx {
/// Creates a new, empty `LazyMapOfIndexSet`.
pub fn new() -> Self {
Self {
val_index: DashMap::default(),
pending_operations: Default::default(),
}
}

/// Returns a reference to the value corresponding to the key.
pub fn get(&mut self, key: &Value) -> Option<Ref<'_, Value, HashSet<Value>>> {
self.flush_pending_operations_for_key(key);
self.val_index.get(key)
}

/// Lazily inserts a value for all keys in the given index set.
/// The actual insertion will be performed when the map is next accessed.
pub fn insert_for_all_keys(&self, keys: HashSet<Value>, value: Value) {
if keys.len() < LAZY_BOUND {
for key in keys {
self.val_index.entry(key).or_default().insert(value);
}
} else {
self.pending_operations
.lock()
.unwrap()
.push((keys, InsertOrRemove::Insert(value)));
}
}

/// Lazily removes a value for all keys in the given index set.
pub fn remove_for_all_keys(&self, keys: HashSet<Value>, value: Value) {
if keys.len() < LAZY_BOUND {
for key in keys {
if let Some(mut pending_keys) = self.val_index.get_mut(&key) {
pending_keys.remove(&value);
}
}
} else {
self.pending_operations
.lock()
.unwrap()
.push((keys, InsertOrRemove::Remove(value)));
}
}

/// Flushes all pending lazy insertions to the underlying map.
fn flush_pending_operations_for_key(&self, key: &Value) {
let mut pending_ops = self.pending_operations.lock().unwrap();
let mut flush_whole_set = false;
for (keys, op) in pending_ops.iter_mut() {
if keys.contains(key) {
// if the length of keys set is less than LAZY_BOUND just flush the whole set.
if keys.len() < LAZY_BOUND {
// flush all keys in set
for key in keys.iter() {
match op {
InsertOrRemove::Insert(v) => {
self.val_index.entry(*key).or_default().insert(*v);
}
InsertOrRemove::Remove(v) => {
if let Some(mut pending_keys) = self.val_index.get_mut(key) {
pending_keys.remove(v);
}
}
}
}
flush_whole_set = true;
} else {
match op {
InsertOrRemove::Insert(v) => {
self.val_index.entry(*key).or_default().insert(*v);
}
InsertOrRemove::Remove(v) => {
if let Some(mut pending_keys) = self.val_index.get_mut(key) {
pending_keys.remove(v);
}
}
}
keys.remove(key);
}
}
}
if flush_whole_set {
pending_ops.retain(|(keys, _ops)| !keys.is_empty());
}
}
}

impl Default for LazyContainerIdx {
fn default() -> Self {
Self::new()
}
}

impl<C: ContainerValue> DynamicContainerEnv for ContainerEnv<C> {
Expand Down Expand Up @@ -242,7 +353,7 @@ impl<C: ContainerValue> ContainerEnv<C> {
counter,
to_id: DashMap::default(),
to_container: DashMap::default(),
val_index: DashMap::default(),
val_index: Default::default(),
}
}

Expand Down Expand Up @@ -274,9 +385,8 @@ impl<C: ContainerValue> ContainerEnv<C> {
dashmap::Entry::Vacant(vac) => {
// Common case: insert the mapping in to_id and update the index.
vac.insert(value);
for val in container.iter() {
self.val_index.entry(val).or_default().insert(value);
}
self.val_index
.insert_for_all_keys(container.iter().collect(), value);
value
}
dashmap::Entry::Occupied(occ) => {
Expand All @@ -302,18 +412,16 @@ impl<C: ContainerValue> ContainerEnv<C> {
self.to_container.remove(&old_val);
self.to_container.insert(result, (hc as usize, target_map));
*occ.get_mut() = result;
for val in occ.key().iter() {
let mut index = self.val_index.entry(val).or_default();
index.swap_remove(&old_val);
index.insert(result);
}
self.val_index
.remove_for_all_keys(occ.key().iter().collect(), old_val);
self.val_index
.insert_for_all_keys(occ.key().iter().collect(), result);
}
}
dashmap::Entry::Vacant(vacant_entry) => {
self.to_container.insert(value, (hc as usize, target_map));
for val in vacant_entry.key().iter() {
self.val_index.entry(val).or_default().insert(value);
}
self.val_index
.insert_for_all_keys(vacant_entry.key().iter().collect(), value);
vacant_entry.insert(value);
}
}
Expand Down Expand Up @@ -349,7 +457,7 @@ impl<C: ContainerValue> ContainerEnv<C> {
let Some(ids) = self.val_index.get(&row[0]) else {
continue;
};
to_rebuild.extend(&*ids);
to_rebuild.extend(ids.iter());
}
for id in to_rebuild {
let Some((hc, target_map)) = self.to_container.get(&id).map(|x| *x) else {
Expand Down Expand Up @@ -500,18 +608,17 @@ impl<C: ContainerValue> ContainerEnv<C> {
self.to_container.remove(&old_val);
self.to_container.insert(result, (hc as usize, target_map));
*val_slot.get_mut() = result;
for val in container.iter() {
let mut index = self.val_index.entry(val).or_default();
index.swap_remove(&old_val);
index.insert(result);
}
self.val_index
.remove_for_all_keys(container.iter().collect(), old_val);
self.val_index
.insert_for_all_keys(container.iter().collect(), result);
}
}
Err(slot) => {
self.to_container.insert(val, (hc as usize, target_map));
for v in container.iter() {
self.val_index.entry(v).or_default().insert(val);
}
self.val_index
.insert_for_all_keys(container.iter().collect(), val);

// SAFETY: We just got this slot from `find_or_find_insert_slot`
// and we have not mutated the map at all since then.
unsafe {
Expand Down