Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/lance-core/src/cache/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ pub trait CacheBackend: Send + Sync + std::fmt::Debug {
/// Remove all entries whose prefix starts with the given string.
async fn invalidate_prefix(&self, prefix: &str);

/// Remove only the entry identified by `key`, if present.
async fn invalidate_entry(&self, key: &InternalCacheKey);

/// Remove all entries.
async fn clear(&self);

Expand Down
183 changes: 183 additions & 0 deletions rust/lance-core/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,18 @@ impl LanceCache {
self.cache.invalidate_prefix(&full_prefix).await;
}

/// Invalidate one typed entry.
pub async fn invalidate_with_key<K: CacheKey>(&self, cache_key: &K) {
let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
self.cache.invalidate_entry(&key).await;
}

/// Invalidate one unsized-typed entry.
pub async fn invalidate_unsized_with_key<K: UnsizedCacheKey>(&self, cache_key: &K) {
let key = build_key(&self.prefix, &cache_key.key(), K::type_name());
self.cache.invalidate_entry(&key).await;
}

pub async fn size(&self) -> usize {
self.cache.num_entries().await
}
Expand Down Expand Up @@ -804,6 +816,9 @@ mod tests {
async fn invalidate_prefix(&self, prefix: &str) {
self.map.lock().await.retain(|k, _| !k.starts_with(prefix));
}
async fn invalidate_entry(&self, key: &InternalCacheKey) {
self.map.lock().await.remove(key);
}
async fn clear(&self) {
self.map.lock().await.clear();
}
Expand Down Expand Up @@ -835,6 +850,174 @@ mod tests {
);
}

#[tokio::test]
async fn test_invalidate_with_key_removes_only_target() {
let cache = LanceCache::with_capacity(10_000);
cache
.insert_with_key(&TestKey::<Vec<i32>>::new("a"), Arc::new(vec![1]))
.await;
cache
.insert_with_key(&TestKey::<Vec<i32>>::new("b"), Arc::new(vec![2]))
.await;
// Same key, different type: should survive.
cache
.insert_with_key(&TestKey::<Vec<u8>>::new("a"), Arc::new(vec![3u8]))
.await;
assert_eq!(cache.size().await, 3);

cache
.invalidate_with_key(&TestKey::<Vec<i32>>::new("a"))
.await;

assert!(
cache
.get_with_key(&TestKey::<Vec<i32>>::new("a"))
.await
.is_none()
);
assert!(
cache
.get_with_key(&TestKey::<Vec<i32>>::new("b"))
.await
.is_some(),
"sibling key at same prefix must survive",
);
assert!(
cache
.get_with_key(&TestKey::<Vec<u8>>::new("a"))
.await
.is_some(),
"sibling type_name at same key must survive",
);
assert_eq!(cache.size().await, 2);
}

#[tokio::test]
async fn test_invalidate_with_key_respects_prefix_scope() {
let base = LanceCache::with_capacity(10_000);
let ns_a = base.with_key_prefix("a");
let ns_b = base.with_key_prefix("b");

base.insert_with_key(&TestKey::<Vec<i32>>::new("k"), Arc::new(vec![1]))
.await;
ns_a.insert_with_key(&TestKey::<Vec<i32>>::new("k"), Arc::new(vec![2]))
.await;
ns_b.insert_with_key(&TestKey::<Vec<i32>>::new("k"), Arc::new(vec![3]))
.await;
assert_eq!(base.size().await, 3);

ns_a.invalidate_with_key(&TestKey::<Vec<i32>>::new("k"))
.await;

assert!(
ns_a.get_with_key(&TestKey::<Vec<i32>>::new("k"))
.await
.is_none()
);
assert!(
ns_b.get_with_key(&TestKey::<Vec<i32>>::new("k"))
.await
.is_some(),
"sibling sub-prefix must not be invalidated",
);
assert!(
base.get_with_key(&TestKey::<Vec<i32>>::new("k"))
.await
.is_some(),
"parent-prefix entry must not be invalidated",
);
assert_eq!(base.size().await, 2);
}

#[tokio::test]
async fn test_invalidate_with_key_keeps_nested_prefix_subcache() {
// Parent-key invalidation must not clear child prefixes.
let base = LanceCache::with_capacity(10_000);
let nested = base.with_key_prefix("ns");

base.insert_with_key(&TestKey::<Vec<i32>>::new("k"), Arc::new(vec![1]))
.await;
nested
.insert_with_key(&TestKey::<Vec<i32>>::new("k"), Arc::new(vec![2]))
.await;
assert_eq!(base.size().await, 2);

base.invalidate_with_key(&TestKey::<Vec<i32>>::new("k"))
.await;

assert!(
base.get_with_key(&TestKey::<Vec<i32>>::new("k"))
.await
.is_none()
);
assert!(
nested
.get_with_key(&TestKey::<Vec<i32>>::new("k"))
.await
.is_some(),
"nested subcache entry must survive parent-scope invalidation",
);
assert_eq!(base.size().await, 1);
}

#[tokio::test]
async fn test_invalidate_with_key_missing_is_noop() {
let cache = LanceCache::with_capacity(10_000);
cache
.insert_with_key(&TestKey::<Vec<i32>>::new("present"), Arc::new(vec![1]))
.await;

cache
.invalidate_with_key(&TestKey::<Vec<i32>>::new("absent"))
.await;

assert!(
cache
.get_with_key(&TestKey::<Vec<i32>>::new("present"))
.await
.is_some()
);
assert_eq!(cache.size().await, 1);
}

#[tokio::test]
async fn test_invalidate_unsized_with_key_removes_only_target() {
#[derive(Debug, DeepSizeOf)]
struct MyType(i32);

trait MyTrait: DeepSizeOf + Send + Sync + std::any::Any {}
impl MyTrait for MyType {}

let cache = LanceCache::with_capacity(10_000);
let a: Arc<dyn MyTrait> = Arc::new(MyType(1));
let b: Arc<dyn MyTrait> = Arc::new(MyType(2));
cache
.insert_unsized_with_key(&TestUnsizedKey::<dyn MyTrait>::new("a"), a)
.await;
cache
.insert_unsized_with_key(&TestUnsizedKey::<dyn MyTrait>::new("b"), b)
.await;
assert_eq!(cache.size().await, 2);

cache
.invalidate_unsized_with_key(&TestUnsizedKey::<dyn MyTrait>::new("a"))
.await;

assert!(
cache
.get_unsized_with_key(&TestUnsizedKey::<dyn MyTrait>::new("a"))
.await
.is_none()
);
assert!(
cache
.get_unsized_with_key(&TestUnsizedKey::<dyn MyTrait>::new("b"))
.await
.is_some()
);
assert_eq!(cache.size().await, 1);
}

#[tokio::test]
async fn test_get_or_insert_dedup() {
use std::sync::atomic::AtomicUsize;
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-core/src/cache/moka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl CacheBackend for MokaCacheBackend {
.expect("Cache configured correctly");
}

async fn invalidate_entry(&self, key: &InternalCacheKey) {
self.cache.invalidate(key).await;
}

async fn clear(&self) {
self.cache.invalidate_all();
self.cache.run_pending_tasks().await;
Expand Down
5 changes: 5 additions & 0 deletions rust/lance/src/dataset/tests/dataset_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,11 @@ mod fts_serializing_backend {
self.passthrough.invalidate_prefix(prefix).await;
}

async fn invalidate_entry(&self, key: &InternalCacheKey) {
self.serialized.lock().await.remove(key);
self.passthrough.invalidate_entry(key).await;
}

async fn clear(&self) {
self.serialized.lock().await.clear();
self.passthrough.clear().await;
Expand Down
Loading
Loading