diff --git a/vortex-bench/src/clickbench/benchmark.rs b/vortex-bench/src/clickbench/benchmark.rs index 2885dc29c3b..c036d1c084b 100644 --- a/vortex-bench/src/clickbench/benchmark.rs +++ b/vortex-bench/src/clickbench/benchmark.rs @@ -41,7 +41,7 @@ impl ClickBenchBenchmark { } } -/// ClickBench sorted by event date and event time. +/// ClickBench sorted by event time, with shard filenames shuffled to exercise sort pushdown. pub struct ClickBenchSortedBenchmark { pub queries_file: Option, pub data_url: Url, diff --git a/vortex-bench/src/clickbench/data.rs b/vortex-bench/src/clickbench/data.rs index 7347e9fa1ba..bd570d6536c 100644 --- a/vortex-bench/src/clickbench/data.rs +++ b/vortex-bench/src/clickbench/data.rs @@ -34,11 +34,14 @@ use crate::datasets::data_downloads::download_data; use crate::datasets::data_downloads::download_many; use crate::utils::file::temp_download_filepath; -/// Benchmark and local data directory name for ClickBench sorted by event date/time. +/// Benchmark and local data directory name for ClickBench sorted by event time. pub const CLICKBENCH_SORTED_NAME: &str = "clickbench-sorted"; const CLICKBENCH_PARTITIONED_NAME: &str = "clickbench_partitioned"; const SORTED_SHARD_COUNT: usize = 100; const SORTED_SHARD_COUNT_U64: u64 = 100; +const SORTED_SHARD_FILENAME_WIDTH: usize = 3; +const SORTED_SHARD_PERMUTATION_MULTIPLIER: u64 = 37; +const SORTED_SHARD_PERMUTATION_OFFSET: u64 = 17; /// Zero-based ClickBench query IDs that filter by or order/group on `EventDate`/`EventTime`. pub const CLICKBENCH_SORTED_QUERY_IDS: &[usize] = &[23, 24, 26, 36, 37, 38, 39, 40, 41, 42]; @@ -235,13 +238,18 @@ impl Flavor { } } -/// Generate globally sorted ClickBench Parquet shards under `basepath`. +/// Generate sorted ClickBench Parquet shards under `basepath`. +/// +/// Each shard contains a contiguous `EventTime` range, but the shard filenames are deterministically +/// shuffled. That makes ordinary file listing order bad for ascending TopK queries, so file sort +/// pushdown has a visible job to do. pub async fn generate_sorted_clickbench(basepath: impl AsRef) -> anyhow::Result<()> { + let basepath = basepath.as_ref(); let source_base = CLICKBENCH_PARTITIONED_NAME.to_data_path(); Flavor::Partitioned.download(&source_base).await?; let source_parquet_dir = source_base.join(Format::Parquet.name()); - let output_parquet_dir = basepath.as_ref().join(Format::Parquet.name()); + let output_parquet_dir = basepath.join(Format::Parquet.name()); if output_parquet_dir.exists() { info!( @@ -374,7 +382,7 @@ PRAGMA temp_directory={temp_dir}; CREATE TABLE hits_sorted AS SELECT * FROM read_parquet({source_glob}) - ORDER BY \"EventDate\", \"EventTime\", \"WatchID\"; + ORDER BY \"EventTime\"; ", temp_dir = sql_string_literal(&duckdb_temp_dir.display().to_string()), source_glob = sql_string_literal(&source_glob.display().to_string()), @@ -383,7 +391,7 @@ CREATE TABLE hits_sorted AS for shard_idx in 0..SORTED_SHARD_COUNT_U64 { let start = shard_idx * rows_per_shard; let end = (start + rows_per_shard).min(source_rows); - let output_path = output_parquet_dir.join(format!("hits_{shard_idx}.parquet")); + let output_path = output_parquet_dir.join(sorted_shard_file_name(shard_idx)); script.push_str(&format!( "\ COPY ( @@ -400,6 +408,21 @@ COPY ( script } +fn sorted_shard_file_name(sorted_shard_idx: u64) -> String { + debug_assert!(sorted_shard_idx < SORTED_SHARD_COUNT_U64); + let listing_shard_idx = sorted_shard_listing_idx(sorted_shard_idx); + format!( + "hits_{listing_shard_idx:0width$}.parquet", + width = SORTED_SHARD_FILENAME_WIDTH + ) +} + +fn sorted_shard_listing_idx(sorted_shard_idx: u64) -> u64 { + debug_assert!(sorted_shard_idx < SORTED_SHARD_COUNT_U64); + (sorted_shard_idx * SORTED_SHARD_PERMUTATION_MULTIPLIER + SORTED_SHARD_PERMUTATION_OFFSET) + % SORTED_SHARD_COUNT_U64 +} + fn parquet_dir_row_count(parquet_dir: &Path) -> anyhow::Result { let files = parquet_files(parquet_dir)?; anyhow::ensure!( @@ -450,3 +473,58 @@ fn sql_string_literal(value: &str) -> String { fn quote_identifier(value: &str) -> String { format!("\"{}\"", value.replace('"', "\"\"")) } + +#[cfg(test)] +mod tests { + use vortex::utils::aliases::hash_set::HashSet; + + use super::*; + + #[test] + fn sorted_clickbench_shard_names_use_deterministic_shuffle() { + let names = (0..SORTED_SHARD_COUNT_U64) + .map(sorted_shard_file_name) + .collect::>(); + + assert_eq!(names[0], "hits_017.parquet"); + assert_eq!(names[1], "hits_054.parquet"); + assert_eq!(names[2], "hits_091.parquet"); + assert_eq!(names[99], "hits_080.parquet"); + + let unique_names = names.iter().collect::>(); + assert_eq!(unique_names.len(), SORTED_SHARD_COUNT); + + let mut sorted_names = names.clone(); + sorted_names.sort(); + assert_ne!(names, sorted_names); + + let mut reverse_names = sorted_names; + reverse_names.reverse(); + assert_ne!(names, reverse_names); + } + + #[test] + fn sorted_clickbench_script_sorts_for_topk_and_writes_shuffled_shards() { + let script = sorted_clickbench_duckdb_script( + Path::new("source"), + Path::new("out"), + Path::new("tmp"), + 1000, + ); + + assert!(script.contains("ORDER BY \"EventTime\";")); + + let first = script + .find("hits_017.parquet") + .expect("first sorted shard should use its shuffled shard name"); + let second = script + .find("hits_054.parquet") + .expect("second sorted shard should use its shuffled shard name"); + let third = script + .find("hits_091.parquet") + .expect("third sorted shard should use its shuffled shard name"); + + assert!(first < second); + assert!(second < third); + } +}