Skip to content

Commit c750666

Browse files
markbtfacebook-github-bot
authored andcommitted
cmdlib/sharding: process new repos concurrently in set_shards
Summary: When handling `set_shards`, process up to 20 repos at a time. Because of the mutable reference lifetime of the lock guard, we can't simply use a stream combinator to give us concurrency. Instead we use `FuturesUnordered` and manually add/poll items out of it. Reviewed By: RajivTS Differential Revision: D72176110 fbshipit-source-id: 8daca5eae8c35cef347cd71e07896fdc20352edc
1 parent 12964c6 commit c750666

File tree

1 file changed

+22
-7
lines changed

1 file changed

+22
-7
lines changed

eden/mononoke/cmdlib/sharding/src/facebook.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
* GNU General Public License version 2.
66
*/
77

8-
use std::collections::hash_map::Entry;
98
use std::collections::HashMap;
109
use std::env;
1110
use std::sync::atomic::AtomicBool;
@@ -20,6 +19,7 @@ use futures::future::select;
2019
use futures::future::Either;
2120
use futures::future::FutureExt;
2221
use futures::stream;
22+
use futures::stream::FuturesUnordered;
2323
use futures::stream::StreamExt;
2424
use futures::stream::TryStreamExt;
2525
use sharding_ext::RepoShard;
@@ -500,20 +500,35 @@ impl ShardedProcessHandler {
500500
.await?;
501501

502502
// Assign new repos to this Process based on the incoming Shards.
503+
let mut setups = FuturesUnordered::new();
503504
for (new_repo, new_shard) in new_repo_shards {
504-
if let Entry::Vacant(entry) = guarded_repo_map.entry(new_repo.clone()) {
505+
while setups.len() >= 20 {
506+
// Limit the number of concurrent setups.
507+
let (new_repo, execution_process) = setups
508+
.try_next()
509+
.await?
510+
.ok_or_else(|| anyhow!("Unexpected empty setup"))?;
511+
guarded_repo_map.insert(new_repo, RepoProcess::Execution(execution_process));
512+
}
513+
if !guarded_repo_map.contains_key(&new_repo) {
505514
let setup_process = RepoSetupProcess::new(
506515
new_shard,
507-
new_repo,
516+
new_repo.clone(),
508517
Arc::clone(&self.setup_job),
509518
&self.runtime_handle,
510519
);
511-
let execution_process = setup_process
512-
.execution_process(&self.runtime_handle)
513-
.await?;
514-
entry.insert(RepoProcess::Execution(execution_process));
520+
let setup = async move {
521+
let execution_process = setup_process
522+
.execution_process(&self.runtime_handle)
523+
.await?;
524+
anyhow::Ok((new_repo, execution_process))
525+
};
526+
setups.push(setup.boxed());
515527
}
516528
}
529+
while let Some((new_repo, execution_process)) = setups.try_next().await? {
530+
guarded_repo_map.insert(new_repo, RepoProcess::Execution(execution_process));
531+
}
517532

518533
info!(self.logger, "Completed setup for {} shards", shard_count);
519534

0 commit comments

Comments
 (0)