@@ -10,10 +10,18 @@ use std::sync::atomic::AtomicBool;
1010use std:: sync:: atomic:: Ordering ;
1111use std:: sync:: Arc ;
1212
13+ use anyhow:: anyhow;
14+ use anyhow:: Error ;
1315use anyhow:: Result ;
16+ use bytes:: BytesMut ;
1417use bytesize:: ByteSize ;
18+ use cloned:: cloned;
1519use context:: CoreContext ;
20+ use edenapi_types:: AnyFileContentId ;
1621use futures:: channel:: oneshot;
22+ use futures:: stream;
23+ use futures:: StreamExt ;
24+ use futures:: TryStreamExt ;
1725use mononoke_macros:: mononoke;
1826use mononoke_types:: ContentId ;
1927use repo_blobstore:: RepoBlobstore ;
@@ -63,21 +71,41 @@ impl ContentManager {
6371 }
6472
6573 async fn flush_batch (
66- _ctx : CoreContext ,
74+ ctx : CoreContext ,
75+ repo_blobstore : RepoBlobstore ,
6776 content_es : & Arc < dyn EdenapiSender + Send + Sync > ,
6877 current_batch : & mut Vec < ContentId > ,
6978 current_batch_size : u64 ,
7079 pending_messages : & mut VecDeque < oneshot:: Sender < Result < ( ) , anyhow:: Error > > > ,
7180 reponame : String ,
7281 logger : & Logger ,
7382 ) -> Result < ( ) , anyhow:: Error > {
74- let current_batch_len = current_batch. len ( ) as i64 ;
83+ let current_batch_len = current_batch. len ( ) ;
7584 let start = std:: time:: Instant :: now ( ) ;
85+
7686 if current_batch_len > 0 {
77- if let Err ( e) = content_es
78- . upload_contents ( std:: mem:: take ( current_batch) )
79- . await
80- {
87+ let contents = std:: mem:: take ( current_batch) ;
88+
89+ let full_items = stream:: iter ( contents)
90+ . map ( |id| {
91+ cloned ! ( ctx, repo_blobstore) ;
92+ async move {
93+ let bytes = filestore:: fetch ( repo_blobstore, ctx, & id. into ( ) )
94+ . await ?
95+ . ok_or ( anyhow ! ( "Content is not found (which should never happen" ) ) ?
96+ . try_collect :: < BytesMut > ( )
97+ . await ?;
98+ Ok :: < _ , Error > ( (
99+ AnyFileContentId :: ContentId ( id. into ( ) ) ,
100+ bytes. freeze ( ) . into ( ) ,
101+ ) )
102+ }
103+ } )
104+ . buffer_unordered ( current_batch_len)
105+ . try_collect :: < Vec < ( AnyFileContentId , minibytes:: Bytes ) > > ( )
106+ . await ?;
107+
108+ if let Err ( e) = content_es. upload_contents ( full_items) . await {
81109 error ! ( logger, "Error processing content: {:?}" , e) ;
82110 return Err ( e) ;
83111 } else {
@@ -91,7 +119,7 @@ impl ContentManager {
91119
92120 let elapsed = start. elapsed ( ) . as_millis ( ) / current_batch_len as u128 ;
93121 STATS :: content_upload_time_s. add_value ( elapsed as i64 , ( reponame. clone ( ) , ) ) ;
94- STATS :: synced_contents. add_value ( current_batch_len, ( reponame. clone ( ) , ) ) ;
122+ STATS :: synced_contents. add_value ( current_batch_len as i64 , ( reponame. clone ( ) , ) ) ;
95123 }
96124 }
97125
@@ -144,6 +172,7 @@ impl Manager for ContentManager {
144172 if current_batch_size >= MAX_BLOB_BYTES || current_batch. len( ) >= MAX_CONTENT_BATCH_SIZE {
145173 if let Err ( e) = ContentManager :: flush_batch(
146174 ctx. clone( ) ,
175+ self . repo_blobstore. clone( ) ,
147176 & content_es,
148177 & mut current_batch,
149178 current_batch_size,
@@ -161,6 +190,7 @@ impl Manager for ContentManager {
161190 if current_batch_size > 0 || !pending_messages. is_empty( ) {
162191 if let Err ( e) = ContentManager :: flush_batch(
163192 ctx. clone( ) ,
193+ self . repo_blobstore. clone( ) ,
164194 & content_es,
165195 & mut current_batch,
166196 current_batch_size,
0 commit comments