diff --git a/vortex-array/src/arrays/dict/compute/rules.rs b/vortex-array/src/arrays/dict/compute/rules.rs index b4804218e37..e4d102ee861 100644 --- a/vortex-array/src/arrays/dict/compute/rules.rs +++ b/vortex-array/src/arrays/dict/compute/rules.rs @@ -9,12 +9,15 @@ use crate::EqMode; use crate::IntoArray; use crate::array::ArrayView; use crate::array::VTable; +use crate::arrays::Chunked; +use crate::arrays::ChunkedArray; use crate::arrays::Constant; use crate::arrays::ConstantArray; use crate::arrays::Dict; use crate::arrays::DictArray; use crate::arrays::ScalarFn; use crate::arrays::ScalarFnArray; +use crate::arrays::chunked::ChunkedArrayExt; use crate::arrays::dict::DictArrayExt; use crate::arrays::dict::DictArraySlotsExt; use crate::arrays::filter::FilterReduceAdaptor; @@ -37,11 +40,59 @@ pub(crate) const PARENT_RULES: ParentRuleSet = ParentRuleSet::new(&[ ParentRuleSet::lift(&CastReduceAdaptor(Dict)), ParentRuleSet::lift(&MaskReduceAdaptor(Dict)), ParentRuleSet::lift(&LikeReduceAdaptor(Dict)), + ParentRuleSet::lift(&DictionaryChunkedValuesPullUpRule), ParentRuleSet::lift(&DictionaryScalarFnValuesPushDownRule), ParentRuleSet::lift(&DictionaryScalarFnCodesPullUpRule), ParentRuleSet::lift(&SliceReduceAdaptor(Dict)), ]); +/// Pull a common dictionary values array above chunked dictionary codes. +/// +/// Rewrites `Chunked>` into `Dict, values>` only when +/// every child dictionary shares the exact same values array allocation. +#[derive(Debug)] +struct DictionaryChunkedValuesPullUpRule; + +impl ArrayParentReduceRule for DictionaryChunkedValuesPullUpRule { + type Parent = Chunked; + + fn reduce_parent( + &self, + array: ArrayView<'_, Dict>, + parent: ArrayView<'_, Chunked>, + _child_idx: usize, + ) -> VortexResult> { + let values = array.values(); + let codes_dtype = array.codes().dtype().clone(); + let mut code_chunks = Vec::with_capacity(parent.nchunks()); + let mut all_values_referenced = array.has_all_values_referenced(); + + for chunk in parent.iter_chunks() { + let Some(dict) = chunk.as_opt::() else { + return Ok(None); + }; + if dict.codes().dtype() != &codes_dtype { + return Ok(None); + } + if !ArrayRef::ptr_eq(dict.values(), values) { + return Ok(None); + } + all_values_referenced |= dict.has_all_values_referenced(); + code_chunks.push(dict.codes().clone()); + } + + let codes = ChunkedArray::try_new(code_chunks, codes_dtype)?.into_array(); + let dict = DictArray::try_new(codes, values.clone())?; + let dict = if all_values_referenced { + unsafe { dict.set_all_values_referenced(true) } + } else { + dict + }; + + Ok(Some(dict.into_array())) + } +} + /// Push down a scalar function to run only over the values of a dictionary array. #[derive(Debug)] struct DictionaryScalarFnValuesPushDownRule; @@ -214,16 +265,72 @@ mod tests { use vortex_buffer::buffer; use vortex_error::VortexResult; + use crate::ArrayRef; use crate::IntoArray; use crate::arrays::BoolArray; + use crate::arrays::Chunked; + use crate::arrays::ChunkedArray; use crate::arrays::Dict; use crate::arrays::DictArray; + use crate::arrays::PrimitiveArray; + use crate::arrays::chunked::ChunkedArrayExt; use crate::arrays::dict::DictArrayExt; + use crate::arrays::dict::DictArraySlotsExt; use crate::arrays::scalar_fn::ScalarFnFactoryExt; + use crate::assert_arrays_eq; + use crate::executor::VortexSessionExecute; use crate::optimizer::ArrayOptimizer; use crate::scalar_fn::EmptyOptions; use crate::scalar_fn::fns::not::Not; + #[test] + fn chunked_dict_with_shared_values_pulls_values_up() -> VortexResult<()> { + let values = buffer![10u32, 20, 30].into_array(); + let chunk0 = DictArray::try_new(buffer![0u8, 1].into_array(), values.clone())?.into_array(); + let chunk1 = + DictArray::try_new(buffer![2u8, 0, 1].into_array(), values.clone())?.into_array(); + let array = + ChunkedArray::try_new(vec![chunk0, chunk1], values.dtype().clone())?.into_array(); + + let optimized = array.optimize()?; + let dict = optimized.as_::(); + let codes = dict.codes().as_::(); + + assert!(ArrayRef::ptr_eq(dict.values(), &values)); + assert_eq!(codes.nchunks(), 2); + let mut ctx = crate::LEGACY_SESSION.create_execution_ctx(); + assert_arrays_eq!( + optimized, + PrimitiveArray::from_iter([10u32, 20, 30, 10, 20]), + &mut ctx + ); + + Ok(()) + } + + #[test] + fn chunked_dict_with_distinct_values_stays_chunked() -> VortexResult<()> { + let values0 = buffer![10u32, 20, 30].into_array(); + let values1 = buffer![10u32, 20, 30].into_array(); + let chunk0 = + DictArray::try_new(buffer![0u8, 1].into_array(), values0.clone())?.into_array(); + let chunk1 = DictArray::try_new(buffer![2u8, 0, 1].into_array(), values1)?.into_array(); + let array = + ChunkedArray::try_new(vec![chunk0, chunk1], values0.dtype().clone())?.into_array(); + + let optimized = array.optimize()?; + + assert!(optimized.is::()); + let mut ctx = crate::LEGACY_SESSION.create_execution_ctx(); + assert_arrays_eq!( + optimized, + PrimitiveArray::from_iter([10u32, 20, 30, 10, 20]), + &mut ctx + ); + + Ok(()) + } + #[test] fn scalar_fn_values_pushdown_preserves_all_values_referenced() -> VortexResult<()> { let dict = unsafe { diff --git a/vortex-array/src/arrays/dict/vtable/kernel.rs b/vortex-array/src/arrays/dict/vtable/kernel.rs index ab750f7d663..79659af18dd 100644 --- a/vortex-array/src/arrays/dict/vtable/kernel.rs +++ b/vortex-array/src/arrays/dict/vtable/kernel.rs @@ -4,6 +4,7 @@ use vortex_session::VortexSession; use crate::ArrayVTable; +use crate::arrays::Chunked; use crate::arrays::Dict; use crate::arrays::dict::TakeExecuteAdaptor; use crate::optimizer::kernels::ArrayKernelsExt; @@ -16,6 +17,7 @@ use crate::scalar_fn::fns::fill_null::FillNullExecuteAdaptor; pub(crate) fn initialize(session: &VortexSession) { let kernels = session.kernels(); kernels.register_execute_parent_kernel(Binary.id(), Dict, CompareExecuteAdaptor(Dict)); + kernels.register_execute_parent_kernel(Dict.id(), Chunked, TakeExecuteAdaptor(Chunked)); kernels.register_execute_parent_kernel(Dict.id(), Dict, TakeExecuteAdaptor(Dict)); kernels.register_execute_parent_kernel(FillNull.id(), Dict, FillNullExecuteAdaptor(Dict)); }