Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ anyhow = "1"
axum = "0.8"
async-trait = "0.1"
base64 = "0.22"
futures-core = "0.3"
bincode = "1.3"
bitflags = { version = "2.10", features = ["serde"] }
blake3 = "1.8"
Expand Down
1 change: 1 addition & 0 deletions data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ progress_tracking = { path = "../progress_tracking" }
utils = { path = "../utils" }
xet_runtime = { path = "../xet_runtime" }

futures = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion data/src/bin/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn clean(mut reader: impl Read, mut writer: impl Write, size: u64) -> Resu
FileUploadSession::new(TranslatorConfig::local_config(std::env::current_dir()?)?.into(), None).await?;

let mut size_read = 0;
let mut handle = translator.start_clean(None, size, None).await;
let mut handle = translator.start_clean(None, Some(size), None).await;

loop {
let bytes = reader.read(&mut read_buf)?;
Expand Down
53 changes: 49 additions & 4 deletions data/src/data_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ pub async fn upload_bytes_async(

let semaphore = XetRuntime::current().common().file_ingestion_semaphore.clone();
let upload_session = FileUploadSession::new(config.into(), progress_updater).await?;
let clean_futures = file_contents.into_iter().map(|blob| {
let clean_futures = file_contents.into_iter().enumerate().map(|(i, blob)| {
let upload_session = upload_session.clone();
async move { clean_bytes(upload_session, blob).await.map(|(xf, _metrics)| xf) }
let name: Arc<str> = format!("{i}").into();
async move { clean_bytes(upload_session, blob, Some(name)).await.map(|(xf, _metrics)| xf) }
.instrument(info_span!("clean_task"))
});
let files = run_constrained_with_semaphore(clean_futures, semaphore).await?;
Expand Down Expand Up @@ -239,8 +240,9 @@ pub async fn download_async(
pub async fn clean_bytes(
processor: Arc<FileUploadSession>,
bytes: Vec<u8>,
name: Option<Arc<str>>,
) -> errors::Result<(XetFileInfo, DeduplicationMetrics)> {
let mut handle = processor.start_clean(None, bytes.len() as u64, None).await;
let mut handle = processor.start_clean(name, Some(bytes.len() as u64), None).await;
handle.add_data(&bytes).await?;
handle.finish().await
}
Expand All @@ -261,7 +263,11 @@ pub async fn clean_file(
let mut buffer = vec![0u8; u64::min(filesize, *xet_config().data.ingestion_block_size) as usize];

let mut handle = processor
.start_clean(Some(filename.as_ref().to_string_lossy().into()), filesize, Sha256::from_hex(sha256.as_ref()).ok())
.start_clean(
Some(filename.as_ref().to_string_lossy().into()),
Some(filesize),
Sha256::from_hex(sha256.as_ref()).ok(),
)
.await;

loop {
Expand Down Expand Up @@ -615,4 +621,43 @@ mod tests {
assert_eq!(file_info1.file_size(), file_info2.file_size());
assert_eq!(file_info1.file_size(), file_info3.file_size());
}

#[tokio::test]
async fn test_upload_bytes() {
let temp_dir = tempdir().unwrap();
let endpoint = format!("local://{}", temp_dir.path().display());

let contents: Vec<Vec<u8>> = vec![
vec![],
b"Hello, World!".to_vec(),
(0..1_000_000).map(|i| (i % 256) as u8).collect(),
];

// Upload all as bytes.
let file_infos = upload_bytes_async(contents.clone(), Some(endpoint.clone()), None, None, None, "test".into())
.await
.unwrap();

assert_eq!(file_infos.len(), contents.len());
for (info, content) in file_infos.iter().zip(&contents) {
assert_eq!(info.file_size(), content.len() as u64);
assert!(!info.hash().is_empty());
}
// Different contents produce different hashes.
assert_ne!(file_infos[0].hash(), file_infos[2].hash());

// Download to files and verify.
let download_dir = tempdir().unwrap();
let file_infos_with_paths: Vec<_> = file_infos
.into_iter()
.enumerate()
.map(|(i, info)| (info, download_dir.path().join(format!("{i}")).to_str().unwrap().to_string()))
.collect();
let paths = download_async(file_infos_with_paths, Some(endpoint), None, None, None, "test".into())
.await
.unwrap();
for (path, expected) in paths.iter().zip(&contents) {
assert_eq!(std::fs::read(path).unwrap(), *expected);
}
}
}
2 changes: 1 addition & 1 deletion data/src/file_cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl SingleFileCleaner {
let (file_hash, remaining_file_data, deduplication_metrics) =
self.dedup_manager_fut.await?.finalize(Some(metadata_ext));

let file_info = XetFileInfo::new(file_hash.hex(), deduplication_metrics.total_bytes);
let file_info = XetFileInfo::with_sha256(file_hash.hex(), deduplication_metrics.total_bytes, sha256.hex());

// Let's check some things that should be invariants
#[cfg(debug_assertions)]
Expand Down
9 changes: 6 additions & 3 deletions data/src/file_upload_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ impl FileUploadSession {

// Get a new file id for the completion tracking. This also registers the size against the total bytes
// of the file.
let file_id = self.completion_tracker.register_new_file(file_name.clone(), file_size).await;
let file_id = self
.completion_tracker
.register_new_file(file_name.clone(), Some(file_size))
.await;

// Now, spawn a task
let ingestion_concurrency_limiter = XetRuntime::current().common().file_ingestion_semaphore.clone();
Expand Down Expand Up @@ -253,7 +256,7 @@ impl FileUploadSession {
pub async fn start_clean(
self: &Arc<Self>,
file_name: Option<Arc<str>>,
size: u64,
size: Option<u64>,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When writing using opendal the size is not known ahead of time, so in order to avoid collecting the data I had to implement this workaround. Ideally we could turn of progress tracking but seems like its API is more coupled with the general upload process.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turning it off is done by simply using the NoOp version, so it's possible to do. We should definitely take that situation into account.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for the hint! Trying it out.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a closer look. While I can turn off the actual reporting by passing noop, CompletionTracker is still running and verifying (in dev mode) the upload so I ended up making total_bytes optional essentially signaling that total_bytes is not known before uploading.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I had to handle the known/unknown case in the download progress tracking in a parallel PR. There it explicitly tries to update the total as more data is streamed if it's not known. The issue with making it optional is that reporting functions elsewhere use the ratio heavily, which would be problematic. Let me put up a PR quick to add this same feature to the upload tracking.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a PR up for this at #651. This should satisfy your use case and satisfy the needed invariants for the reporting UX in both cases.

sha256: Option<Sha256>,
) -> SingleFileCleaner {
// Get a new file id for the completion tracking
Expand Down Expand Up @@ -564,7 +567,7 @@ mod tests {
.unwrap();

let mut cleaner = upload_session
.start_clean(Some("test".into()), read_data.len() as u64, None)
.start_clean(Some("test".into()), Some(read_data.len() as u64), None)
.await;

// Read blocks from the source file and hand them to the cleaning handle
Expand Down
1 change: 1 addition & 0 deletions data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod prometheus_metrics;
mod remote_client_interface;
mod sha256;
mod shard_interface;
pub mod streaming;
mod xet_file;

// Reexport this one for now
Expand Down
Loading
Loading