@@ -6,16 +6,13 @@ use std::{
66 fs,
77 io:: { BufRead , BufReader , Read , stdin} ,
88 path:: PathBuf ,
9- sync:: {
10- Arc , Mutex ,
11- mpsc:: { Receiver , Sender , channel} ,
12- } ,
9+ sync:: mpsc:: { Receiver , Sender , channel} ,
1310 thread,
1411} ;
1512
1613use crate :: {
1714 SkywayError ,
18- chunks:: { Chunk , ChunkBuilder , ElementChunk } ,
15+ chunks:: { ChunkBuilder , ElementChunk } ,
1916 elements:: Metadata ,
2017 sort:: { ElementSorter , SortStrategy } ,
2118} ;
@@ -172,7 +169,7 @@ pub trait Reader: Sized + Clone + Send + 'static {
172169 // and run those resulting ElementChunks through the combined_filter.
173170 let chunk_iterator = self
174171 . read_file ( source, metadata_sender, chunk_builder)
175- . map ( |chunk| combined_filter ( chunk) ) ;
172+ . map ( |chunk| combined_filter ( chunk, Vec :: new ( ) ) ) ;
176173
177174 // Send out each of those filtered ElementChunks through the correct channel.
178175 chunk_iterator. for_each ( |chunk| {
@@ -225,10 +222,6 @@ pub trait Reader: Sized + Clone + Send + 'static {
225222 // Please note that this is where all filtering happens!
226223 let keep_ids = build_keep_list ( & filters, first_filter_chunk_receiver) ;
227224
228- // Make keep_ids thread-safe
229- // TODO: Consider performance implications of this.
230- let keep_ids = Arc :: new ( Mutex :: new ( keep_ids) ) ;
231-
232225 // "Fake" channel that we won't use, to make the reader thread happy.
233226 // We already read the metadata the first time around.
234227 let ( fake_metadata_sender, fake_metadata_receiver) = channel ( ) ;
@@ -250,37 +243,13 @@ pub trait Reader: Sized + Clone + Send + 'static {
250243 // It is imperative that we re-run the filters, because even though
251244 // we already know which elements we want to keep, we don't know
252245 // what modifications the filters might make to those elements.
253- . map ( |chunk| combined_filter ( chunk) )
254- // keep elements depending on if we determined they are necessary above
255- . for_each ( move |chunk| {
256- // Vec that will hold each element we keep from this ElementChunk.
257- let mut elements = Vec :: new ( ) ;
258-
259- // We are iterating through a ParallelIterator, which is why I am
260- // locking keep_ids here. I reckon there are better ways of
261- // accomplishing this!
262- let mut keep_ids_lock = keep_ids. lock ( ) . unwrap ( ) ;
263-
264- // For each element, push it to the elements Vec (to keep) if it
265- // existed in the keep_ids Vec. At the same time, remove the ID
266- // from keep_ids.
267- //
268- // TODO: is it really necessary to delete it? Are we really going
269- // to check if keep_ids is fully exhausted in the end?
270- for element in chunk. content {
271- if keep_ids_lock. remove ( & element. id ) {
272- elements. push ( element) ;
273- }
274- }
275- // Drop the lock so parallel processes can have their fun with keep_ids
276- drop ( keep_ids_lock) ;
277-
278- // Send elements Vec out, neatly packaged as a Chunk
246+ // We pass keep_ids to combined_filter so that it keeps referenced
247+ // elements.
248+ . map ( |chunk| combined_filter ( chunk, keep_ids. clone ( ) ) )
249+ // Send chunk out for sorting
250+ . for_each ( |chunk| {
279251 filter_chunk_sender
280- . send ( Chunk {
281- content : elements. into_boxed_slice ( ) ,
282- index : chunk. index ,
283- } )
252+ . send ( chunk)
284253 . expect ( "Unable to send chunk." )
285254 } ) ;
286255
0 commit comments