Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vortex-bench/src/clickbench/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl ClickBenchBenchmark {
}
}

/// ClickBench sorted by event date and event time.
/// ClickBench sorted by event time, with shard filenames shuffled to exercise sort pushdown.
pub struct ClickBenchSortedBenchmark {
pub queries_file: Option<String>,
pub data_url: Url,
Expand Down
88 changes: 83 additions & 5 deletions vortex-bench/src/clickbench/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ use crate::datasets::data_downloads::download_data;
use crate::datasets::data_downloads::download_many;
use crate::utils::file::temp_download_filepath;

/// Benchmark and local data directory name for ClickBench sorted by event date/time.
/// Benchmark and local data directory name for ClickBench sorted by event time.
pub const CLICKBENCH_SORTED_NAME: &str = "clickbench-sorted";
const CLICKBENCH_PARTITIONED_NAME: &str = "clickbench_partitioned";
const SORTED_SHARD_COUNT: usize = 100;
const SORTED_SHARD_COUNT_U64: u64 = 100;
const SORTED_SHARD_FILENAME_WIDTH: usize = 3;
const SORTED_SHARD_PERMUTATION_MULTIPLIER: u64 = 37;
const SORTED_SHARD_PERMUTATION_OFFSET: u64 = 17;

/// Zero-based ClickBench query IDs that filter by or order/group on `EventDate`/`EventTime`.
pub const CLICKBENCH_SORTED_QUERY_IDS: &[usize] = &[23, 24, 26, 36, 37, 38, 39, 40, 41, 42];
Expand Down Expand Up @@ -235,13 +238,18 @@ impl Flavor {
}
}

/// Generate globally sorted ClickBench Parquet shards under `basepath`.
/// Generate sorted ClickBench Parquet shards under `basepath`.
///
/// Each shard contains a contiguous `EventTime` range, but the shard filenames are deterministically
/// shuffled. That makes ordinary file listing order bad for ascending TopK queries, so file sort
/// pushdown has a visible job to do.
pub async fn generate_sorted_clickbench(basepath: impl AsRef<Path>) -> anyhow::Result<()> {
let basepath = basepath.as_ref();
let source_base = CLICKBENCH_PARTITIONED_NAME.to_data_path();
Flavor::Partitioned.download(&source_base).await?;

let source_parquet_dir = source_base.join(Format::Parquet.name());
let output_parquet_dir = basepath.as_ref().join(Format::Parquet.name());
let output_parquet_dir = basepath.join(Format::Parquet.name());

if output_parquet_dir.exists() {
info!(
Expand Down Expand Up @@ -374,7 +382,7 @@ PRAGMA temp_directory={temp_dir};
CREATE TABLE hits_sorted AS
SELECT *
FROM read_parquet({source_glob})
ORDER BY \"EventDate\", \"EventTime\", \"WatchID\";
ORDER BY \"EventTime\";
",
temp_dir = sql_string_literal(&duckdb_temp_dir.display().to_string()),
source_glob = sql_string_literal(&source_glob.display().to_string()),
Expand All @@ -383,7 +391,7 @@ CREATE TABLE hits_sorted AS
for shard_idx in 0..SORTED_SHARD_COUNT_U64 {
let start = shard_idx * rows_per_shard;
let end = (start + rows_per_shard).min(source_rows);
let output_path = output_parquet_dir.join(format!("hits_{shard_idx}.parquet"));
let output_path = output_parquet_dir.join(sorted_shard_file_name(shard_idx));
script.push_str(&format!(
"\
COPY (
Expand All @@ -400,6 +408,21 @@ COPY (
script
}

fn sorted_shard_file_name(sorted_shard_idx: u64) -> String {
debug_assert!(sorted_shard_idx < SORTED_SHARD_COUNT_U64);
let listing_shard_idx = sorted_shard_listing_idx(sorted_shard_idx);
format!(
"hits_{listing_shard_idx:0width$}.parquet",
width = SORTED_SHARD_FILENAME_WIDTH
)
}

fn sorted_shard_listing_idx(sorted_shard_idx: u64) -> u64 {
debug_assert!(sorted_shard_idx < SORTED_SHARD_COUNT_U64);
(sorted_shard_idx * SORTED_SHARD_PERMUTATION_MULTIPLIER + SORTED_SHARD_PERMUTATION_OFFSET)
% SORTED_SHARD_COUNT_U64
}

fn parquet_dir_row_count(parquet_dir: &Path) -> anyhow::Result<u64> {
let files = parquet_files(parquet_dir)?;
anyhow::ensure!(
Expand Down Expand Up @@ -450,3 +473,58 @@ fn sql_string_literal(value: &str) -> String {
fn quote_identifier(value: &str) -> String {
format!("\"{}\"", value.replace('"', "\"\""))
}

#[cfg(test)]
mod tests {
use vortex::utils::aliases::hash_set::HashSet;

use super::*;

#[test]
fn sorted_clickbench_shard_names_use_deterministic_shuffle() {
let names = (0..SORTED_SHARD_COUNT_U64)
.map(sorted_shard_file_name)
.collect::<Vec<_>>();

assert_eq!(names[0], "hits_017.parquet");
assert_eq!(names[1], "hits_054.parquet");
assert_eq!(names[2], "hits_091.parquet");
assert_eq!(names[99], "hits_080.parquet");

let unique_names = names.iter().collect::<HashSet<_>>();
assert_eq!(unique_names.len(), SORTED_SHARD_COUNT);

let mut sorted_names = names.clone();
sorted_names.sort();
assert_ne!(names, sorted_names);

let mut reverse_names = sorted_names;
reverse_names.reverse();
assert_ne!(names, reverse_names);
}

#[test]
fn sorted_clickbench_script_sorts_for_topk_and_writes_shuffled_shards() {
let script = sorted_clickbench_duckdb_script(
Path::new("source"),
Path::new("out"),
Path::new("tmp"),
1000,
);

assert!(script.contains("ORDER BY \"EventTime\";"));

let first = script
.find("hits_017.parquet")
.expect("first sorted shard should use its shuffled shard name");
let second = script
.find("hits_054.parquet")
.expect("second sorted shard should use its shuffled shard name");
let third = script
.find("hits_091.parquet")
.expect("third sorted shard should use its shuffled shard name");

assert!(first < second);
assert!(second < third);
}
}
Loading