Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions vortex-array/src/arrays/dict/compute/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use crate::EqMode;
use crate::IntoArray;
use crate::array::ArrayView;
use crate::array::VTable;
use crate::arrays::Chunked;
use crate::arrays::ChunkedArray;
use crate::arrays::Constant;
use crate::arrays::ConstantArray;
use crate::arrays::Dict;
use crate::arrays::DictArray;
use crate::arrays::ScalarFn;
use crate::arrays::ScalarFnArray;
use crate::arrays::chunked::ChunkedArrayExt;
use crate::arrays::dict::DictArrayExt;
use crate::arrays::dict::DictArraySlotsExt;
use crate::arrays::filter::FilterReduceAdaptor;
Expand All @@ -37,11 +40,59 @@ pub(crate) const PARENT_RULES: ParentRuleSet<Dict> = ParentRuleSet::new(&[
ParentRuleSet::lift(&CastReduceAdaptor(Dict)),
ParentRuleSet::lift(&MaskReduceAdaptor(Dict)),
ParentRuleSet::lift(&LikeReduceAdaptor(Dict)),
ParentRuleSet::lift(&DictionaryChunkedValuesPullUpRule),
ParentRuleSet::lift(&DictionaryScalarFnValuesPushDownRule),
ParentRuleSet::lift(&DictionaryScalarFnCodesPullUpRule),
ParentRuleSet::lift(&SliceReduceAdaptor(Dict)),
]);

/// Pull a common dictionary values array above chunked dictionary codes.
///
/// Rewrites `Chunked<Dict<codes_i, values>>` into `Dict<Chunked<codes_i>, values>` only when
/// every child dictionary shares the exact same values array allocation.
#[derive(Debug)]
struct DictionaryChunkedValuesPullUpRule;

impl ArrayParentReduceRule<Dict> for DictionaryChunkedValuesPullUpRule {
type Parent = Chunked;

fn reduce_parent(
&self,
array: ArrayView<'_, Dict>,
parent: ArrayView<'_, Chunked>,
_child_idx: usize,
) -> VortexResult<Option<ArrayRef>> {
let values = array.values();
let codes_dtype = array.codes().dtype().clone();
let mut code_chunks = Vec::with_capacity(parent.nchunks());
let mut all_values_referenced = array.has_all_values_referenced();

for chunk in parent.iter_chunks() {
let Some(dict) = chunk.as_opt::<Dict>() else {
return Ok(None);
};
if dict.codes().dtype() != &codes_dtype {
return Ok(None);
}
if !ArrayRef::ptr_eq(dict.values(), values) {
return Ok(None);
}
all_values_referenced |= dict.has_all_values_referenced();
code_chunks.push(dict.codes().clone());
}

let codes = ChunkedArray::try_new(code_chunks, codes_dtype)?.into_array();
let dict = DictArray::try_new(codes, values.clone())?;
let dict = if all_values_referenced {
unsafe { dict.set_all_values_referenced(true) }
} else {
dict
};

Ok(Some(dict.into_array()))
}
}

/// Push down a scalar function to run only over the values of a dictionary array.
#[derive(Debug)]
struct DictionaryScalarFnValuesPushDownRule;
Expand Down Expand Up @@ -214,16 +265,72 @@ mod tests {
use vortex_buffer::buffer;
use vortex_error::VortexResult;

use crate::ArrayRef;
use crate::IntoArray;
use crate::arrays::BoolArray;
use crate::arrays::Chunked;
use crate::arrays::ChunkedArray;
use crate::arrays::Dict;
use crate::arrays::DictArray;
use crate::arrays::PrimitiveArray;
use crate::arrays::chunked::ChunkedArrayExt;
use crate::arrays::dict::DictArrayExt;
use crate::arrays::dict::DictArraySlotsExt;
use crate::arrays::scalar_fn::ScalarFnFactoryExt;
use crate::assert_arrays_eq;
use crate::executor::VortexSessionExecute;
use crate::optimizer::ArrayOptimizer;
use crate::scalar_fn::EmptyOptions;
use crate::scalar_fn::fns::not::Not;

#[test]
fn chunked_dict_with_shared_values_pulls_values_up() -> VortexResult<()> {
let values = buffer![10u32, 20, 30].into_array();
let chunk0 = DictArray::try_new(buffer![0u8, 1].into_array(), values.clone())?.into_array();
let chunk1 =
DictArray::try_new(buffer![2u8, 0, 1].into_array(), values.clone())?.into_array();
let array =
ChunkedArray::try_new(vec![chunk0, chunk1], values.dtype().clone())?.into_array();

let optimized = array.optimize()?;
let dict = optimized.as_::<Dict>();
let codes = dict.codes().as_::<Chunked>();

assert!(ArrayRef::ptr_eq(dict.values(), &values));
assert_eq!(codes.nchunks(), 2);
let mut ctx = crate::LEGACY_SESSION.create_execution_ctx();
assert_arrays_eq!(
optimized,
PrimitiveArray::from_iter([10u32, 20, 30, 10, 20]),
&mut ctx
);

Ok(())
}

#[test]
fn chunked_dict_with_distinct_values_stays_chunked() -> VortexResult<()> {
let values0 = buffer![10u32, 20, 30].into_array();
let values1 = buffer![10u32, 20, 30].into_array();
let chunk0 =
DictArray::try_new(buffer![0u8, 1].into_array(), values0.clone())?.into_array();
let chunk1 = DictArray::try_new(buffer![2u8, 0, 1].into_array(), values1)?.into_array();
let array =
ChunkedArray::try_new(vec![chunk0, chunk1], values0.dtype().clone())?.into_array();

let optimized = array.optimize()?;

assert!(optimized.is::<Chunked>());
let mut ctx = crate::LEGACY_SESSION.create_execution_ctx();
assert_arrays_eq!(
optimized,
PrimitiveArray::from_iter([10u32, 20, 30, 10, 20]),
&mut ctx
);

Ok(())
}

#[test]
fn scalar_fn_values_pushdown_preserves_all_values_referenced() -> VortexResult<()> {
let dict = unsafe {
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/arrays/dict/vtable/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use vortex_session::VortexSession;

use crate::ArrayVTable;
use crate::arrays::Chunked;
use crate::arrays::Dict;
use crate::arrays::dict::TakeExecuteAdaptor;
use crate::optimizer::kernels::ArrayKernelsExt;
Expand All @@ -16,6 +17,7 @@ use crate::scalar_fn::fns::fill_null::FillNullExecuteAdaptor;
pub(crate) fn initialize(session: &VortexSession) {
let kernels = session.kernels();
kernels.register_execute_parent_kernel(Binary.id(), Dict, CompareExecuteAdaptor(Dict));
kernels.register_execute_parent_kernel(Dict.id(), Chunked, TakeExecuteAdaptor(Chunked));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this exist before and was not registered?

kernels.register_execute_parent_kernel(Dict.id(), Dict, TakeExecuteAdaptor(Dict));
kernels.register_execute_parent_kernel(FillNull.id(), Dict, FillNullExecuteAdaptor(Dict));
}
Loading