Skip to content

Commit b0b8949

Browse files
Clara Rullmeta-codesync[bot]
authored andcommitted
Revert D99820098: Delete dewey_res.bzl
Differential Revision: D99820098 Original commit changeset: bf58ea7fb287 Original Phabricator Diff: D99820098 fbshipit-source-id: d78dc397b47ffa94cd7118932371cabbd2e80719
1 parent 7154992 commit b0b8949

File tree

3 files changed

+96
-100
lines changed

3 files changed

+96
-100
lines changed

eden/mononoke/features/async_requests/worker/src/main.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,7 @@ impl RepoShardedProcess for WorkerProcess {
122122
.add_repo(repo_name)
123123
.await
124124
.with_context(|| format!("Failure in setting up repo {}", repo_name))?;
125-
let repo_id = repo.repo_identity.id();
126-
let repos = vec![repo_id];
125+
let repos = vec![repo.repo_identity.id()];
127126
info!("Completed setup for repos {:?}", repos);
128127

129128
let queue = Arc::new(AsyncMethodRequestQueue::new(
@@ -139,7 +138,6 @@ impl RepoShardedProcess for WorkerProcess {
139138
self.mononoke.clone(),
140139
self.megarepo.clone(),
141140
self.will_exit.clone(),
142-
Some(repo_id),
143141
)
144142
.await?;
145143
Ok(Arc::new(executor))
@@ -313,7 +311,6 @@ fn run_worker_queue(
313311
mononoke.clone(),
314312
megarepo.clone(),
315313
will_exit.clone(),
316-
None, // non-sharded: no specific repo_id
317314
))?
318315
};
319316
runtime.spawn(async move { executor.execute().await });

eden/mononoke/features/async_requests/worker/src/stats.rs

Lines changed: 91 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
* GNU General Public License version 2.
66
*/
77

8+
use std::collections::HashMap;
89
use std::time::Duration;
910

1011
use async_requests::AsyncMethodRequestQueue;
1112
use async_requests::types::RequestStatus;
1213
use context::CoreContext;
1314
use mononoke_api::RepositoryId;
1415
use mononoke_types::Timestamp;
16+
use requests_table::QueueStatsEntry;
1517
use stats::define_stats;
1618
use stats::prelude::*;
1719
use tracing::warn;
@@ -35,19 +37,9 @@ define_stats! {
3537
queue_age_by_repo_and_status: dynamic_singleton_counter("queue.{}.age_s.{}", (repo_id: String, status: String)),
3638
}
3739

38-
/// Report queue stats for this shard's repo only.
39-
///
40-
/// Each shard reports metrics only for its own repo_id. This avoids
41-
/// cross-shard racing on shared counters and prevents stale values
42-
/// when a shard is reassigned: the counter simply goes absent (no
43-
/// data) rather than being stuck at a stale non-zero value. A dead
44-
/// shard detector should alert on absence of data.
45-
///
46-
/// For the non-sharded code path, `repo_id` is `None` and only the
47-
/// global (non-per-repo) counters are reported.
4840
pub(crate) async fn stats_loop(
4941
ctx: &CoreContext,
50-
repo_id: Option<RepositoryId>,
42+
repo_ids: Vec<RepositoryId>,
5143
queue: &AsyncMethodRequestQueue,
5244
) {
5345
loop {
@@ -57,10 +49,8 @@ pub(crate) async fn stats_loop(
5749
Ok(res) => {
5850
process_queue_length_by_status(ctx, &res);
5951
process_queue_age_by_status(ctx, now, &res);
60-
if let Some(repo_id) = &repo_id {
61-
process_queue_length_by_repo(ctx, repo_id, &res);
62-
process_queue_age_by_repo(ctx, repo_id, now, &res);
63-
}
52+
process_queue_length_by_repo_and_status(ctx, &repo_ids, &res);
53+
process_queue_age_by_repo_and_status(ctx, &repo_ids, now, &res);
6454
}
6555
Err(err) => {
6656
STATS::stats_error.add_value(1);
@@ -72,22 +62,51 @@ pub(crate) async fn stats_loop(
7262
}
7363
}
7464

65+
// Keep track of the stats we have already logged. Any missing ones, we will log a 0.
66+
struct Seen {
67+
inner: HashMap<QueueStatsEntry, bool>,
68+
}
69+
70+
impl Seen {
71+
fn new(repo_ids: &Vec<RepositoryId>) -> Self {
72+
let mut seen = HashMap::new();
73+
for status in STATUSES {
74+
for repo_id in repo_ids {
75+
seen.insert(
76+
QueueStatsEntry {
77+
repo_id: Some(repo_id.clone()),
78+
status,
79+
},
80+
false,
81+
);
82+
}
83+
}
84+
Self { inner: seen }
85+
}
86+
87+
fn mark(&mut self, repo_id: Option<RepositoryId>, status: RequestStatus) {
88+
self.inner.insert(QueueStatsEntry { repo_id, status }, true);
89+
}
90+
91+
fn get_missing(&self) -> Vec<QueueStatsEntry> {
92+
self.inner
93+
.iter()
94+
.filter_map(|(entry, seen)| if !*seen { Some(entry) } else { None })
95+
.cloned()
96+
.collect()
97+
}
98+
}
99+
75100
fn process_queue_length_by_status(ctx: &CoreContext, res: &requests_table::QueueStats) {
76-
// Report whatever the DB returns for global stats, then zero out
77-
// any of the 4 statuses that were absent from the result.
78-
let mut seen = [false; 4];
101+
let mut seen = Seen::new(&vec![]);
79102
let stats = &res.queue_length_by_status;
80103
for (status, count) in stats {
81-
if let Some(idx) = status_index(status) {
82-
seen[idx] = true;
83-
}
104+
seen.mark(None, *status);
84105
STATS::queue_length_by_status.set_value(ctx.fb, *count as i64, (status.to_string(),));
85106
}
86107

87-
for (idx, was_seen) in seen.iter().enumerate() {
88-
if !was_seen {
89-
STATS::queue_length_by_status.set_value(ctx.fb, 0, (STATUSES[idx].to_string(),));
90-
}
108+
for entry in seen.get_missing() {
109+
STATS::queue_length_by_status.set_value(ctx.fb, 0, (entry.status.to_string(),));
91110
}
92111
}
93112

@@ -96,93 +115,79 @@ fn process_queue_age_by_status(
96115
now: Timestamp,
97116
res: &requests_table::QueueStats,
98117
) {
99-
let mut seen = [false; 4];
118+
let mut seen = Seen::new(&vec![]);
100119
let stats = &res.queue_age_by_status;
101120
for (status, ts) in stats {
102-
if let Some(idx) = status_index(status) {
103-
seen[idx] = true;
104-
}
121+
seen.mark(None, *status);
105122
let diff = std::cmp::max(now.timestamp_seconds() - ts.timestamp_seconds(), 0);
106123
STATS::queue_age_by_status.set_value(ctx.fb, diff, (status.to_string(),));
107124
}
108125

109-
for (idx, was_seen) in seen.iter().enumerate() {
110-
if !was_seen {
111-
STATS::queue_age_by_status.set_value(ctx.fb, 0, (STATUSES[idx].to_string(),));
112-
}
126+
for entry in seen.get_missing() {
127+
STATS::queue_age_by_status.set_value(ctx.fb, 0, (entry.status.to_string(),));
113128
}
114129
}
115130

116-
/// Report per-repo queue length for this shard's repo only.
117-
/// If the DB returns no data for a status, we report 0.
118-
fn process_queue_length_by_repo(
131+
fn process_queue_length_by_repo_and_status(
119132
ctx: &CoreContext,
120-
repo_id: &RepositoryId,
133+
repo_ids: &Vec<RepositoryId>,
121134
res: &requests_table::QueueStats,
122135
) {
123-
let mut seen = [false; 4];
124-
let repo_id_str = repo_id.to_string();
136+
let mut seen = Seen::new(repo_ids);
125137
let stats = &res.queue_length_by_repo_and_status;
126138
for (entry, count) in stats {
127-
if entry.repo_id.as_ref() == Some(repo_id) {
128-
if let Some(idx) = status_index(&entry.status) {
129-
seen[idx] = true;
130-
}
131-
STATS::queue_length_by_repo_and_status.set_value(
132-
ctx.fb,
133-
*count as i64,
134-
(repo_id_str.clone(), entry.status.to_string()),
135-
);
136-
}
139+
seen.mark(entry.repo_id, entry.status);
140+
STATS::queue_length_by_repo_and_status.set_value(
141+
ctx.fb,
142+
*count as i64,
143+
(
144+
entry.repo_id.unwrap_or(RepositoryId::new(0)).to_string(),
145+
entry.status.to_string(),
146+
),
147+
);
137148
}
138149

139-
for (idx, was_seen) in seen.iter().enumerate() {
140-
if !was_seen {
141-
STATS::queue_length_by_repo_and_status.set_value(
142-
ctx.fb,
143-
0,
144-
(repo_id_str.clone(), STATUSES[idx].to_string()),
145-
);
146-
}
150+
for entry in seen.get_missing() {
151+
STATS::queue_length_by_repo_and_status.set_value(
152+
ctx.fb,
153+
0,
154+
(
155+
entry.repo_id.unwrap_or(RepositoryId::new(0)).to_string(),
156+
entry.status.to_string(),
157+
),
158+
);
147159
}
148160
}
149161

150-
/// Report per-repo queue age for this shard's repo only.
151-
/// If the DB returns no data for a status, we report 0.
152-
fn process_queue_age_by_repo(
162+
fn process_queue_age_by_repo_and_status(
153163
ctx: &CoreContext,
154-
repo_id: &RepositoryId,
164+
repo_ids: &Vec<RepositoryId>,
155165
now: Timestamp,
156166
res: &requests_table::QueueStats,
157167
) {
158-
let mut seen = [false; 4];
159-
let repo_id_str = repo_id.to_string();
168+
let mut seen = Seen::new(repo_ids);
160169
let stats = &res.queue_age_by_repo_and_status;
161170
for (entry, ts) in stats {
162-
if entry.repo_id.as_ref() == Some(repo_id) {
163-
if let Some(idx) = status_index(&entry.status) {
164-
seen[idx] = true;
165-
}
166-
let diff = std::cmp::max(now.timestamp_seconds() - ts.timestamp_seconds(), 0);
167-
STATS::queue_age_by_repo_and_status.set_value(
168-
ctx.fb,
169-
diff,
170-
(repo_id_str.clone(), entry.status.to_string()),
171-
);
172-
}
171+
seen.mark(entry.repo_id, entry.status);
172+
let diff = std::cmp::max(now.timestamp_seconds() - ts.timestamp_seconds(), 0);
173+
STATS::queue_age_by_repo_and_status.set_value(
174+
ctx.fb,
175+
diff,
176+
(
177+
entry.repo_id.unwrap_or(RepositoryId::new(0)).to_string(),
178+
entry.status.to_string(),
179+
),
180+
);
173181
}
174182

175-
for (idx, was_seen) in seen.iter().enumerate() {
176-
if !was_seen {
177-
STATS::queue_age_by_repo_and_status.set_value(
178-
ctx.fb,
179-
0,
180-
(repo_id_str.clone(), STATUSES[idx].to_string()),
181-
);
182-
}
183+
for entry in seen.get_missing() {
184+
STATS::queue_age_by_repo_and_status.set_value(
185+
ctx.fb,
186+
0,
187+
(
188+
entry.repo_id.unwrap_or(RepositoryId::new(0)).to_string(),
189+
entry.status.to_string(),
190+
),
191+
);
183192
}
184193
}
185-
186-
fn status_index(status: &RequestStatus) -> Option<usize> {
187-
STATUSES.iter().position(|s| s == status)
188-
}

eden/mononoke/features/async_requests/worker/src/worker.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ use futures_stats::TimedFutureExt;
4242
use hostname::get_hostname;
4343
use megarepo_api::MegarepoApi;
4444
use mononoke_api::Mononoke;
45+
use mononoke_api::MononokeRepo;
4546
use mononoke_api::Repo;
46-
use mononoke_api::RepositoryId;
4747
use mononoke_macros::mononoke;
4848
use mononoke_types::Timestamp;
4949
use stats::define_stats;
@@ -86,8 +86,6 @@ pub struct AsyncMethodRequestWorker {
8686
will_exit: Arc<AtomicBool>,
8787
limit: Option<usize>,
8888
concurrency_limit: usize,
89-
/// The repo this shard is responsible for. `None` for non-sharded workers.
90-
repo_id: Option<RepositoryId>,
9189
}
9290

9391
impl AsyncMethodRequestWorker {
@@ -98,7 +96,6 @@ impl AsyncMethodRequestWorker {
9896
mononoke: Arc<Mononoke<Repo>>,
9997
megarepo: Arc<MegarepoApi<Repo>>,
10098
will_exit: Arc<AtomicBool>,
101-
repo_id: Option<RepositoryId>,
10299
) -> Result<Self, Error> {
103100
let name = {
104101
let tw_job_cluster = std::env::var("TW_JOB_CLUSTER");
@@ -124,7 +121,6 @@ impl AsyncMethodRequestWorker {
124121
will_exit,
125122
limit: args.request_limit,
126123
concurrency_limit: args.jobs,
127-
repo_id,
128124
})
129125
}
130126
}
@@ -137,13 +133,11 @@ impl RepoShardedProcessExecutor for AsyncMethodRequestWorker {
137133
/// will_exit atomic bool is a flag to prevent the worker from grabbing new
138134
/// items from the queue and gracefully terminate.
139135
async fn execute(&self) -> Result<()> {
140-
// Start the stats logger loop.
141-
// Each shard reports metrics only for its own repo to avoid
142-
// cross-shard counter races and stale values on shard reassignment.
136+
// Start the stats logger loop
143137
let (stats, stats_abort_handle) = abortable({
144138
cloned!(self.ctx, self.queue);
145-
let repo_id = self.repo_id;
146-
async move { stats_loop(&ctx, repo_id, &queue).await }
139+
let repo_ids = self.mononoke.known_repo_ids().clone();
140+
async move { stats_loop(&ctx, repo_ids, &queue).await }
147141
});
148142
let _stats = mononoke::spawn_task(stats);
149143

0 commit comments

Comments
 (0)