Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
930 changes: 635 additions & 295 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ geo = "0.31.0"
geo-traits = "0.3.0"
geo-types = "0.7.19"
geoarrow = "0.8.0"
geoarrow-cast = "0.8.0"
get_dir = "0.5.0"
glob = "0.3.2"
goldenfile = "1"
Expand Down Expand Up @@ -241,6 +242,14 @@ similar = "3.0.0"
sketches-ddsketch = "0.4.0"
smallvec = "1.15.1"
smol = "2.0.2"
spatialbench = "0.2"
spatialbench-arrow = "0.2"
# spatialbench still pins arrow 56, two majors behind the workspace arrow. Until upstream
# catches up, write its generated batches with a matching parquet instead of converting
# arrow versions at the boundary.
spatialbench-parquet = { package = "parquet", version = "56", features = [
"async",
] }
static_assertions = "1.1"
strum = "0.28"
syn = { version = "2.0.117", features = ["full"] }
Expand Down
22 changes: 22 additions & 0 deletions benchmarks/duckdb-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub struct DuckClient {
connection: Option<Connection>,
pub db_path: PathBuf,
pub threads: Option<usize>,
/// Replayed on every (re)open, since extensions load per instance. Currently
/// `INSTALL spatial; LOAD spatial;` for SpatialBench.
init_sql: Vec<String>,
}

impl DuckClient {
Expand Down Expand Up @@ -68,9 +71,19 @@ impl DuckClient {
connection: Some(connection),
db_path,
threads,
init_sql: Vec::new(),
})
}

/// Run `statements` now and after every subsequent [`DuckClient::reopen`].
pub fn set_init_sql(&mut self, statements: Vec<String>) -> Result<()> {
for stmt in &statements {
self.connection().query(stmt)?;
}
self.init_sql = statements;
Ok(())
}

pub fn open_and_setup_database(
path: Option<PathBuf>,
threads: Option<usize>,
Expand Down Expand Up @@ -118,6 +131,14 @@ impl DuckClient {
self.db = Some(db);
self.connection = Some(connection);

// Replay init SQL (e.g. LOAD spatial) — extensions are per-instance.
for stmt in &self.init_sql {
self.connection
.as_ref()
.vortex_expect("connection just opened")
.query(stmt)?;
}

Ok(())
}

Expand All @@ -133,6 +154,7 @@ impl DuckClient {
connection: Some(connection),
db_path,
threads: None,
init_sql: Vec::new(),
})
}

Expand Down
8 changes: 6 additions & 2 deletions benchmarks/duckdb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,13 @@ fn main() -> anyhow::Result<()> {
&filtered_queries,
mode,
|format| {
let ctx = DuckClient::new(
let mut ctx = DuckClient::new(
&*benchmark,
format,
args.delete_duckdb_database,
args.threads,
)?;
ctx.set_init_sql(benchmark.engine_init_sql(Engine::DuckDB))?;
ctx.register_tables(&*benchmark, format)?;

// Duckdb doesn't support octet_length for strings but we need this
Expand All @@ -196,7 +197,10 @@ fn main() -> anyhow::Result<()> {
if !args.reuse {
ctx.reopen()?;
}
ctx.execute_query_result(query)
// Adapt the query to how this format surfaces its columns (e.g. SpatialBench geometry:
// `GEOMETRY` for Vortex vs. WKB `BLOB` for Parquet).
let query = benchmark.query_for_format(query, format);
ctx.execute_query_result(&query)
},
)?;

Expand Down
8 changes: 8 additions & 0 deletions vortex-array/src/aggregate_fn/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::aggregate_fn::AggregateFn;
use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnVTable;
use crate::dtype::DType;

/// Reference-counted pointer to an aggregate function plugin.
pub type AggregateFnPluginRef = Arc<dyn AggregateFnPlugin>;
Expand All @@ -28,6 +29,9 @@ pub trait AggregateFnPlugin: 'static + Send + Sync {
/// Deserialize an aggregate function from serialized metadata.
fn deserialize(&self, metadata: &[u8], session: &VortexSession)
-> VortexResult<AggregateFnRef>;

/// The default per-chunk zone statistic to store for a column of `input_dtype`, or `None` if this aggregate isn't one.
fn zone_stat_default(&self, input_dtype: &DType) -> Option<AggregateFnRef>;
}

impl std::fmt::Debug for dyn AggregateFnPlugin {
Expand All @@ -51,4 +55,8 @@ impl<V: AggregateFnVTable> AggregateFnPlugin for V {
let options = AggregateFnVTable::deserialize(self, metadata, session)?;
Ok(AggregateFn::new(self.clone(), options).erased())
}

fn zone_stat_default(&self, input_dtype: &DType) -> Option<AggregateFnRef> {
AggregateFnVTable::zone_stat_default(self, input_dtype)
}
}
13 changes: 13 additions & 0 deletions vortex-array/src/aggregate_fn/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use vortex_session::SessionVar;

use crate::aggregate_fn::AggregateFnId;
use crate::aggregate_fn::AggregateFnPluginRef;
use crate::aggregate_fn::AggregateFnRef;
use crate::aggregate_fn::AggregateFnVTable;
use crate::aggregate_fn::fns::all_nan::AllNan;
use crate::aggregate_fn::fns::all_non_distinct::AllNonDistinct;
Expand Down Expand Up @@ -43,6 +44,7 @@ use crate::arrays::chunked::compute::aggregate::ChunkedArrayAggregate;
use crate::arrays::dict::compute::is_constant::DictIsConstantKernel;
use crate::arrays::dict::compute::is_sorted::DictIsSortedKernel;
use crate::arrays::dict::compute::min_max::DictMinMaxKernel;
use crate::dtype::DType;

/// Session state for aggregate functions and encoding-specific aggregate kernels.
///
Expand Down Expand Up @@ -133,6 +135,17 @@ impl AggregateFnSession {
self.registry.insert(id, pluginref);
}

/// The default per-chunk zone statistics for a column of `input_dtype`, collected from every
/// registered aggregate's `zone_stat_default`.
pub fn zone_stat_defaults(&self, input_dtype: &DType) -> Vec<AggregateFnRef> {
self.registry.read(|registry| {
registry
.values()
.filter_map(|plugin| plugin.zone_stat_default(input_dtype))
.collect()
})
}

/// Returns the aggregate kernel registered for `array_id` and `agg_fn_id`, if any.
///
/// Lookup first checks for a kernel registered for the exact aggregate function, then falls
Expand Down
7 changes: 7 additions & 0 deletions vortex-array/src/aggregate_fn/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ pub trait AggregateFnVTable: 'static + Sized + Clone + Send + Sync {
/// Returns `None` if the aggregate function cannot be applied to the input dtype.
fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType>;

/// If this aggregate should be computed as a default zone statistic for `input_dtype`, return
/// the bound aggregate to store. Default: not a zone-map default.
fn zone_stat_default(&self, input_dtype: &DType) -> Option<AggregateFnRef> {
let _ = input_dtype;
None
}

/// DType of the intermediate partial accumulator state.
///
/// Use a struct dtype when multiple fields are needed
Expand Down
6 changes: 6 additions & 0 deletions vortex-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ vortex = { workspace = true, features = [
"tokio",
"zstd",
] }
vortex-geo = { workspace = true }
vortex-tensor = { workspace = true } # TODO(connor): In the future, this might be inside vortex.

anyhow = { workspace = true }
Expand All @@ -33,6 +34,8 @@ async-trait = { workspace = true }
bzip2 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
geoarrow = { workspace = true }
geoarrow-cast = { workspace = true }
get_dir = { workspace = true }
glob = { workspace = true }
humansize = { workspace = true }
Expand All @@ -48,6 +51,9 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["stream"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
spatialbench = { workspace = true }
spatialbench-arrow = { workspace = true }
spatialbench-parquet = { workspace = true }
sysinfo = { workspace = true }
tabled = { workspace = true, features = ["std"] }
target-lexicon = { workspace = true }
Expand Down
Loading