-
Notifications
You must be signed in to change notification settings - Fork 2k
perf : Optimize count distinct using bitmaps instead of hashsets for smaller datatypes #21456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 17 commits
8d49dfe
0b179ff
c6095ab
9d06408
f185fdc
f7c487a
3f091d9
e09f68a
f8c01a1
df90ef5
bf0f95c
48a6029
61dc8e1
5a2918a
3f4952e
289b354
554f60c
87a9af8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -165,3 +165,354 @@ impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulato | |
| size_of_val(self) + self.values.size() | ||
| } | ||
| } | ||
|
|
||
| /// Optimized COUNT DISTINCT accumulator for u8 using a bool array. | ||
| /// Uses 256 bytes to track all possible u8 values. | ||
| #[derive(Debug)] | ||
| pub struct BoolArray256DistinctCountAccumulator { | ||
| seen: [bool; 256], | ||
| } | ||
|
|
||
| impl BoolArray256DistinctCountAccumulator { | ||
| pub fn new() -> Self { | ||
| Self { seen: [false; 256] } | ||
| } | ||
|
|
||
| #[inline] | ||
| fn count(&self) -> i64 { | ||
| self.seen.iter().filter(|&&b| b).count() as i64 | ||
| } | ||
| } | ||
|
|
||
| impl Default for BoolArray256DistinctCountAccumulator { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl Accumulator for BoolArray256DistinctCountAccumulator { | ||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if values.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_primitive_array::<arrow::datatypes::UInt8Type>(&values[0])?; | ||
| for value in arr.iter().flatten() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this need to be checking for null values?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. flatten should take care of skipping null values so we should be good here
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be optimized further for the no-nulls case: if arr.null_count() == 0 {
for &value in arr.values() {
self.seen[value as usize] = true;
}
} else {
for value in arr.iter().flatten() {
self.seen[value as usize] = true;
}
} |
||
| self.seen[value as usize] = true; | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if states.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_list_array(&states[0])?; | ||
| arr.iter().try_for_each(|maybe_list| { | ||
| if let Some(list) = maybe_list { | ||
| let list = as_primitive_array::<arrow::datatypes::UInt8Type>(&list)?; | ||
| for value in list.values().iter() { | ||
| self.seen[*value as usize] = true; | ||
| } | ||
| }; | ||
| Ok(()) | ||
| }) | ||
| } | ||
|
|
||
| fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> { | ||
| let values: Vec<u8> = self | ||
| .seen | ||
| .iter() | ||
| .enumerate() | ||
| .filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None }) | ||
| .collect(); | ||
|
|
||
| let arr = Arc::new( | ||
| PrimitiveArray::<arrow::datatypes::UInt8Type>::from_iter_values(values), | ||
| ); | ||
| Ok(vec![ | ||
| SingleRowListArrayBuilder::new(arr).build_list_scalar(), | ||
| ]) | ||
| } | ||
|
|
||
| fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> { | ||
| Ok(ScalarValue::Int64(Some(self.count()))) | ||
| } | ||
|
|
||
| fn size(&self) -> usize { | ||
| size_of_val(self) + 256 | ||
coderfender marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| /// Optimized COUNT DISTINCT accumulator for i8 using a bool array. | ||
| /// Uses 256 bytes to track all possible i8 values (mapped to 0..255). | ||
| #[derive(Debug)] | ||
| pub struct BoolArray256DistinctCountAccumulatorI8 { | ||
| seen: [bool; 256], | ||
coderfender marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| impl BoolArray256DistinctCountAccumulatorI8 { | ||
| pub fn new() -> Self { | ||
| Self { seen: [false; 256] } | ||
| } | ||
|
|
||
| #[inline] | ||
| fn count(&self) -> i64 { | ||
| self.seen.iter().filter(|&&b| b).count() as i64 | ||
| } | ||
| } | ||
|
|
||
| impl Default for BoolArray256DistinctCountAccumulatorI8 { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl Accumulator for BoolArray256DistinctCountAccumulatorI8 { | ||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if values.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_primitive_array::<arrow::datatypes::Int8Type>(&values[0])?; | ||
| for value in arr.iter().flatten() { | ||
coderfender marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self.seen[value as u8 as usize] = true; | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if states.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_list_array(&states[0])?; | ||
| arr.iter().try_for_each(|maybe_list| { | ||
| if let Some(list) = maybe_list { | ||
| let list = as_primitive_array::<arrow::datatypes::Int8Type>(&list)?; | ||
| for value in list.values().iter() { | ||
| self.seen[*value as u8 as usize] = true; | ||
| } | ||
| }; | ||
| Ok(()) | ||
| }) | ||
| } | ||
|
|
||
| fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> { | ||
| let values: Vec<i8> = self | ||
| .seen | ||
| .iter() | ||
| .enumerate() | ||
| .filter_map( | ||
| |(idx, &seen)| { | ||
| if seen { Some(idx as u8 as i8) } else { None } | ||
| }, | ||
| ) | ||
| .collect(); | ||
|
|
||
| let arr = Arc::new( | ||
| PrimitiveArray::<arrow::datatypes::Int8Type>::from_iter_values(values), | ||
| ); | ||
| Ok(vec![ | ||
| SingleRowListArrayBuilder::new(arr).build_list_scalar(), | ||
| ]) | ||
| } | ||
|
|
||
| fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> { | ||
| Ok(ScalarValue::Int64(Some(self.count()))) | ||
| } | ||
|
|
||
| fn size(&self) -> usize { | ||
| size_of_val(self) + 256 | ||
| } | ||
| } | ||
|
|
||
| /// Optimized COUNT DISTINCT accumulator for u16 using a 65536-bit bitmap. | ||
| /// Uses 8KB (1024 x u64) to track all possible u16 values. | ||
| #[derive(Debug)] | ||
| pub struct Bitmap65536DistinctCountAccumulator { | ||
| bitmap: Box<[u64; 1024]>, | ||
| } | ||
|
|
||
| impl Bitmap65536DistinctCountAccumulator { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| bitmap: Box::new([0; 1024]), | ||
| } | ||
| } | ||
|
|
||
| #[inline] | ||
| fn set_bit(&mut self, value: u16) { | ||
| let word = (value / 64) as usize; | ||
| let bit = value % 64; | ||
| self.bitmap[word] |= 1u64 << bit; | ||
| } | ||
|
|
||
| #[inline] | ||
| fn count(&self) -> i64 { | ||
| self.bitmap.iter().map(|w| w.count_ones() as i64).sum() | ||
| } | ||
| } | ||
|
|
||
| impl Default for Bitmap65536DistinctCountAccumulator { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl Accumulator for Bitmap65536DistinctCountAccumulator { | ||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if values.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_primitive_array::<arrow::datatypes::UInt16Type>(&values[0])?; | ||
| for value in arr.iter().flatten() { | ||
| self.set_bit(value); | ||
| } | ||
coderfender marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if states.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_list_array(&states[0])?; | ||
| arr.iter().try_for_each(|maybe_list| { | ||
| if let Some(list) = maybe_list { | ||
| let list = as_primitive_array::<arrow::datatypes::UInt16Type>(&list)?; | ||
| for value in list.values().iter() { | ||
| self.set_bit(*value); | ||
| } | ||
| }; | ||
| Ok(()) | ||
| }) | ||
| } | ||
|
|
||
| fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> { | ||
| let mut values = Vec::new(); | ||
| for (word_idx, &word) in self.bitmap.iter().enumerate() { | ||
| if word != 0 { | ||
| for bit in 0..64 { | ||
| if (word & (1u64 << bit)) != 0 { | ||
| values.push((word_idx as u16) * 64 + bit); | ||
| } | ||
| } | ||
| } | ||
| } | ||
coderfender marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| let arr = Arc::new( | ||
| PrimitiveArray::<arrow::datatypes::UInt16Type>::from_iter_values(values), | ||
| ); | ||
| Ok(vec![ | ||
| SingleRowListArrayBuilder::new(arr).build_list_scalar(), | ||
| ]) | ||
| } | ||
|
|
||
| fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> { | ||
| Ok(ScalarValue::Int64(Some(self.count()))) | ||
| } | ||
|
|
||
| fn size(&self) -> usize { | ||
| size_of_val(self) + 8192 | ||
| } | ||
| } | ||
|
|
||
| /// Optimized COUNT DISTINCT accumulator for i16 using a 65536-bit bitmap. | ||
| /// Uses 8KB (1024 x u64) to track all possible i16 values (mapped to 0..65535). | ||
| #[derive(Debug)] | ||
| pub struct Bitmap65536DistinctCountAccumulatorI16 { | ||
| bitmap: Box<[u64; 1024]>, | ||
| } | ||
|
|
||
| impl Bitmap65536DistinctCountAccumulatorI16 { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| bitmap: Box::new([0; 1024]), | ||
| } | ||
| } | ||
|
|
||
| #[inline] | ||
| fn set_bit(&mut self, value: i16) { | ||
| let idx = value as u16; | ||
| let word = (idx / 64) as usize; | ||
| let bit = idx % 64; | ||
| self.bitmap[word] |= 1u64 << bit; | ||
| } | ||
|
|
||
| #[inline] | ||
| fn count(&self) -> i64 { | ||
| self.bitmap.iter().map(|w| w.count_ones() as i64).sum() | ||
| } | ||
| } | ||
|
|
||
| impl Default for Bitmap65536DistinctCountAccumulatorI16 { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 { | ||
| #[inline(never)] | ||
| fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if values.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_primitive_array::<arrow::datatypes::Int16Type>(&values[0])?; | ||
| for value in arr.iter().flatten() { | ||
| self.set_bit(value); | ||
| } | ||
coderfender marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { | ||
| if states.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let arr = as_list_array(&states[0])?; | ||
| arr.iter().try_for_each(|maybe_list| { | ||
| if let Some(list) = maybe_list { | ||
| let list = as_primitive_array::<arrow::datatypes::Int16Type>(&list)?; | ||
| for value in list.values().iter() { | ||
| self.set_bit(*value); | ||
| } | ||
| }; | ||
| Ok(()) | ||
| }) | ||
| } | ||
|
|
||
| fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> { | ||
| let mut values = Vec::new(); | ||
| for (word_idx, &word) in self.bitmap.iter().enumerate() { | ||
| if word != 0 { | ||
| for bit in 0..64 { | ||
| if (word & (1u64 << bit)) != 0 { | ||
| values.push(((word_idx as u16) * 64 + bit) as i16); | ||
| } | ||
| } | ||
| } | ||
| } | ||
coderfender marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| let arr = Arc::new( | ||
| PrimitiveArray::<arrow::datatypes::Int16Type>::from_iter_values(values), | ||
| ); | ||
| Ok(vec![ | ||
| SingleRowListArrayBuilder::new(arr).build_list_scalar(), | ||
| ]) | ||
| } | ||
|
|
||
| fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> { | ||
| Ok(ScalarValue::Int64(Some(self.count()))) | ||
| } | ||
|
|
||
| fn size(&self) -> usize { | ||
| size_of_val(self) + 8192 | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.