55 * GNU General Public License version 2.
66 */
77
8- use std:: collections:: HashMap ;
98use std:: time:: Duration ;
109
1110use async_requests:: AsyncMethodRequestQueue ;
1211use async_requests:: types:: RequestStatus ;
1312use context:: CoreContext ;
1413use mononoke_api:: RepositoryId ;
1514use mononoke_types:: Timestamp ;
16- use requests_table:: QueueStatsEntry ;
1715use stats:: define_stats;
1816use stats:: prelude:: * ;
1917use tracing:: warn;
@@ -37,9 +35,19 @@ define_stats! {
3735 queue_age_by_repo_and_status: dynamic_singleton_counter( "queue.{}.age_s.{}" , ( repo_id: String , status: String ) ) ,
3836}
3937
38+ /// Report queue stats for this shard's repo only.
39+ ///
40+ /// Each shard reports metrics only for its own repo_id. This avoids
41+ /// cross-shard racing on shared counters and prevents stale values
42+ /// when a shard is reassigned: the counter simply goes absent (no
43+ /// data) rather than being stuck at a stale non-zero value. A dead
44+ /// shard detector should alert on absence of data.
45+ ///
46+ /// For the non-sharded code path, `repo_id` is `None` and only the
47+ /// global (non-per-repo) counters are reported.
4048pub ( crate ) async fn stats_loop (
4149 ctx : & CoreContext ,
42- repo_ids : Vec < RepositoryId > ,
50+ repo_id : Option < RepositoryId > ,
4351 queue : & AsyncMethodRequestQueue ,
4452) {
4553 loop {
@@ -49,8 +57,10 @@ pub(crate) async fn stats_loop(
4957 Ok ( res) => {
5058 process_queue_length_by_status ( ctx, & res) ;
5159 process_queue_age_by_status ( ctx, now, & res) ;
52- process_queue_length_by_repo_and_status ( ctx, & repo_ids, & res) ;
53- process_queue_age_by_repo_and_status ( ctx, & repo_ids, now, & res) ;
60+ if let Some ( repo_id) = & repo_id {
61+ process_queue_length_by_repo ( ctx, repo_id, & res) ;
62+ process_queue_age_by_repo ( ctx, repo_id, now, & res) ;
63+ }
5464 }
5565 Err ( err) => {
5666 STATS :: stats_error. add_value ( 1 ) ;
@@ -62,51 +72,22 @@ pub(crate) async fn stats_loop(
6272 }
6373}
6474
65- // Keep track of the stats we have already logged. Any missing ones, we will log a 0.
66- struct Seen {
67- inner : HashMap < QueueStatsEntry , bool > ,
68- }
69-
70- impl Seen {
71- fn new ( repo_ids : & Vec < RepositoryId > ) -> Self {
72- let mut seen = HashMap :: new ( ) ;
73- for status in STATUSES {
74- for repo_id in repo_ids {
75- seen. insert (
76- QueueStatsEntry {
77- repo_id : Some ( repo_id. clone ( ) ) ,
78- status,
79- } ,
80- false ,
81- ) ;
82- }
83- }
84- Self { inner : seen }
85- }
86-
87- fn mark ( & mut self , repo_id : Option < RepositoryId > , status : RequestStatus ) {
88- self . inner . insert ( QueueStatsEntry { repo_id, status } , true ) ;
89- }
90-
91- fn get_missing ( & self ) -> Vec < QueueStatsEntry > {
92- self . inner
93- . iter ( )
94- . filter_map ( |( entry, seen) | if !* seen { Some ( entry) } else { None } )
95- . cloned ( )
96- . collect ( )
97- }
98- }
99-
10075fn process_queue_length_by_status ( ctx : & CoreContext , res : & requests_table:: QueueStats ) {
101- let mut seen = Seen :: new ( & vec ! [ ] ) ;
76+ // Report whatever the DB returns for global stats, then zero out
77+ // any of the 4 statuses that were absent from the result.
78+ let mut seen = [ false ; 4 ] ;
10279 let stats = & res. queue_length_by_status ;
10380 for ( status, count) in stats {
104- seen. mark ( None , * status) ;
81+ if let Some ( idx) = status_index ( status) {
82+ seen[ idx] = true ;
83+ }
10584 STATS :: queue_length_by_status. set_value ( ctx. fb , * count as i64 , ( status. to_string ( ) , ) ) ;
10685 }
10786
108- for entry in seen. get_missing ( ) {
109- STATS :: queue_length_by_status. set_value ( ctx. fb , 0 , ( entry. status . to_string ( ) , ) ) ;
87+ for ( idx, was_seen) in seen. iter ( ) . enumerate ( ) {
88+ if !was_seen {
89+ STATS :: queue_length_by_status. set_value ( ctx. fb , 0 , ( STATUSES [ idx] . to_string ( ) , ) ) ;
90+ }
11091 }
11192}
11293
@@ -115,79 +96,93 @@ fn process_queue_age_by_status(
11596 now : Timestamp ,
11697 res : & requests_table:: QueueStats ,
11798) {
118- let mut seen = Seen :: new ( & vec ! [ ] ) ;
99+ let mut seen = [ false ; 4 ] ;
119100 let stats = & res. queue_age_by_status ;
120101 for ( status, ts) in stats {
121- seen. mark ( None , * status) ;
102+ if let Some ( idx) = status_index ( status) {
103+ seen[ idx] = true ;
104+ }
122105 let diff = std:: cmp:: max ( now. timestamp_seconds ( ) - ts. timestamp_seconds ( ) , 0 ) ;
123106 STATS :: queue_age_by_status. set_value ( ctx. fb , diff, ( status. to_string ( ) , ) ) ;
124107 }
125108
126- for entry in seen. get_missing ( ) {
127- STATS :: queue_age_by_status. set_value ( ctx. fb , 0 , ( entry. status . to_string ( ) , ) ) ;
109+ for ( idx, was_seen) in seen. iter ( ) . enumerate ( ) {
110+ if !was_seen {
111+ STATS :: queue_age_by_status. set_value ( ctx. fb , 0 , ( STATUSES [ idx] . to_string ( ) , ) ) ;
112+ }
128113 }
129114}
130115
131- fn process_queue_length_by_repo_and_status (
116+ /// Report per-repo queue length for this shard's repo only.
117+ /// If the DB returns no data for a status, we report 0.
118+ fn process_queue_length_by_repo (
132119 ctx : & CoreContext ,
133- repo_ids : & Vec < RepositoryId > ,
120+ repo_id : & RepositoryId ,
134121 res : & requests_table:: QueueStats ,
135122) {
136- let mut seen = Seen :: new ( repo_ids) ;
123+ let mut seen = [ false ; 4 ] ;
124+ let repo_id_str = repo_id. to_string ( ) ;
137125 let stats = & res. queue_length_by_repo_and_status ;
138126 for ( entry, count) in stats {
139- seen. mark ( entry. repo_id , entry. status ) ;
140- STATS :: queue_length_by_repo_and_status. set_value (
141- ctx. fb ,
142- * count as i64 ,
143- (
144- entry. repo_id . unwrap_or ( RepositoryId :: new ( 0 ) ) . to_string ( ) ,
145- entry. status . to_string ( ) ,
146- ) ,
147- ) ;
127+ if entry. repo_id . as_ref ( ) == Some ( repo_id) {
128+ if let Some ( idx) = status_index ( & entry. status ) {
129+ seen[ idx] = true ;
130+ }
131+ STATS :: queue_length_by_repo_and_status. set_value (
132+ ctx. fb ,
133+ * count as i64 ,
134+ ( repo_id_str. clone ( ) , entry. status . to_string ( ) ) ,
135+ ) ;
136+ }
148137 }
149138
150- for entry in seen. get_missing ( ) {
151- STATS :: queue_length_by_repo_and_status. set_value (
152- ctx. fb ,
153- 0 ,
154- (
155- entry. repo_id . unwrap_or ( RepositoryId :: new ( 0 ) ) . to_string ( ) ,
156- entry. status . to_string ( ) ,
157- ) ,
158- ) ;
139+ for ( idx, was_seen) in seen. iter ( ) . enumerate ( ) {
140+ if !was_seen {
141+ STATS :: queue_length_by_repo_and_status. set_value (
142+ ctx. fb ,
143+ 0 ,
144+ ( repo_id_str. clone ( ) , STATUSES [ idx] . to_string ( ) ) ,
145+ ) ;
146+ }
159147 }
160148}
161149
162- fn process_queue_age_by_repo_and_status (
150+ /// Report per-repo queue age for this shard's repo only.
151+ /// If the DB returns no data for a status, we report 0.
152+ fn process_queue_age_by_repo (
163153 ctx : & CoreContext ,
164- repo_ids : & Vec < RepositoryId > ,
154+ repo_id : & RepositoryId ,
165155 now : Timestamp ,
166156 res : & requests_table:: QueueStats ,
167157) {
168- let mut seen = Seen :: new ( repo_ids) ;
158+ let mut seen = [ false ; 4 ] ;
159+ let repo_id_str = repo_id. to_string ( ) ;
169160 let stats = & res. queue_age_by_repo_and_status ;
170161 for ( entry, ts) in stats {
171- seen. mark ( entry. repo_id , entry. status ) ;
172- let diff = std:: cmp:: max ( now. timestamp_seconds ( ) - ts. timestamp_seconds ( ) , 0 ) ;
173- STATS :: queue_age_by_repo_and_status. set_value (
174- ctx. fb ,
175- diff,
176- (
177- entry. repo_id . unwrap_or ( RepositoryId :: new ( 0 ) ) . to_string ( ) ,
178- entry. status . to_string ( ) ,
179- ) ,
180- ) ;
162+ if entry. repo_id . as_ref ( ) == Some ( repo_id) {
163+ if let Some ( idx) = status_index ( & entry. status ) {
164+ seen[ idx] = true ;
165+ }
166+ let diff = std:: cmp:: max ( now. timestamp_seconds ( ) - ts. timestamp_seconds ( ) , 0 ) ;
167+ STATS :: queue_age_by_repo_and_status. set_value (
168+ ctx. fb ,
169+ diff,
170+ ( repo_id_str. clone ( ) , entry. status . to_string ( ) ) ,
171+ ) ;
172+ }
181173 }
182174
183- for entry in seen. get_missing ( ) {
184- STATS :: queue_age_by_repo_and_status. set_value (
185- ctx. fb ,
186- 0 ,
187- (
188- entry. repo_id . unwrap_or ( RepositoryId :: new ( 0 ) ) . to_string ( ) ,
189- entry. status . to_string ( ) ,
190- ) ,
191- ) ;
175+ for ( idx, was_seen) in seen. iter ( ) . enumerate ( ) {
176+ if !was_seen {
177+ STATS :: queue_age_by_repo_and_status. set_value (
178+ ctx. fb ,
179+ 0 ,
180+ ( repo_id_str. clone ( ) , STATUSES [ idx] . to_string ( ) ) ,
181+ ) ;
182+ }
192183 }
193184}
185+
186+ fn status_index ( status : & RequestStatus ) -> Option < usize > {
187+ STATUSES . iter ( ) . position ( |s| s == status)
188+ }
0 commit comments