Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ pub fn format_to_df_format(format: Format) -> Arc<dyn FileFormat> {
}

fn vortex_table_options() -> VortexTableOptions {
VortexTableOptions {
projection_pushdown: true,
predicate_pushdown: true,
..Default::default()
}
let mut opts = VortexTableOptions::default();

opts.predicate_pushdown = true;
opts.predicate_pushdown = true;

opts
}
157 changes: 119 additions & 38 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use datafusion_common::GetExt;
use datafusion_common::Result as DFResult;
use datafusion_common::ScalarValue as DFScalarValue;
use datafusion_common::Statistics;
use datafusion_common::config::ConfigExtension;
use datafusion_common::config::ConfigField;
use datafusion_common::config_namespace;
use datafusion_common::extensions_options;
use datafusion_common::internal_datafusion_err;
use datafusion_common::not_impl_err;
use datafusion_common::parsers::CompressionTypeVariant;
Expand Down Expand Up @@ -132,28 +133,62 @@ impl Debug for VortexFormat {
}
}

config_namespace! {
extensions_options! {
/// Options to configure [`VortexFormat`] and [`VortexSource`].
///
/// These options are usually set on a [`VortexFormatFactory`] and inherited
/// by the `VortexFormat` / `VortexSource` instances created for individual
/// tables.
/// The API follows DataFusion's built-in Parquet and JSON format factories:
/// a format factory may carry customized defaults, the session may carry
/// format defaults, and `CREATE EXTERNAL TABLE ... OPTIONS(...)` can
/// override individual fields for one table.
///
/// [`FileFormatFactory::create`] builds the `VortexTableOptions` copied into
/// each [`VortexFormat`] as follows:
///
/// 1. If the factory has explicit options from
/// [`VortexFormatFactory::with_options`] or
/// [`VortexFormatFactory::new_with_options`], start from that complete
/// `VortexTableOptions` value. This matches
/// [`ParquetFormatFactory::new_with_options`] and
/// [`JsonFormatFactory::new_with_options`]: factory options replace
/// session defaults; they are not merged with them field-by-field.
/// 2. If the factory does not have explicit options, read the session's
/// `vortex` extension at the time `create` is called. This is the value
/// changed by `SET vortex.<option> = ...`.
/// 3. If the session has no `vortex` extension, start from
/// `VortexTableOptions::default()`.
/// 4. Apply table `OPTIONS(...)` last. Each option overwrites only its
/// matching field, so per-table settings can override either the factory
/// options or the session/default value.
///
/// In SQL, session settings use the `vortex.` prefix. Table options use the
/// field names directly, the same style as Parquet or JSON table options:
///
/// ```text
/// SET vortex.predicate_pushdown = false;
///
/// CREATE EXTERNAL TABLE t (x BIGINT)
/// STORED AS vortex
/// LOCATION 's3://bucket/path/'
/// OPTIONS(predicate_pushdown 'true');
/// ```
///
/// # Example
///
/// ```rust
/// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
///
/// let factory = VortexFormatFactory::new().with_options(VortexTableOptions {
/// projection_pushdown: true,
/// predicate_pushdown: true,
/// scan_concurrency: Some(8),
/// ..Default::default()
/// });
/// let mut options = VortexTableOptions::default();
/// options.predicate_pushdown = true;
/// options.projection_pushdown = true;
/// options.scan_concurrency = Some(8);
///
/// let factory = VortexFormatFactory::new().with_options(options);
/// # let _ = factory;
/// ```
///
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
/// [`ParquetFormatFactory::new_with_options`]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/parquet/struct.ParquetFormatFactory.html#method.new_with_options
/// [`JsonFormatFactory::new_with_options`]: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/json/struct.JsonFormatFactory.html#method.new_with_options
pub struct VortexTableOptions {
/// The number of bytes to read when parsing a file footer.
///
Expand All @@ -165,33 +200,46 @@ config_namespace! {
/// When enabled, projection expressions may be partially evaluated during
/// the scan. When disabled, Vortex reads only the referenced columns and
/// all expressions are evaluated after the scan.
///
/// Disabled by default.
pub projection_pushdown: bool, default = false
/// Whether to enable predicate pushdown into the underlying Vortex scan.
///
/// When enabled, supported filters are evaluated during the scan. When
/// disabled, DataFusion evaluates filters after the scan, while
/// `VortexSource` can still use the full predicate for file pruning.
///
/// Enabled by default.
pub predicate_pushdown: bool, default = true
/// The intra-partition scan concurrency, controlling the number of row splits to process
/// concurrently per-thread within each file.
///
/// This does not affect the overall parallelism
/// across partitions, which is controlled by DataFusion's execution configuration.
///
/// Leave as `None` to use Vortex's scan default. Override per session
/// with `SET vortex.scan_concurrency = <n>`, or per table with
/// `OPTIONS(scan_concurrency '<n>')`.
pub scan_concurrency: Option<usize>, default = None
}
}

impl Eq for VortexTableOptions {}
impl ConfigExtension for VortexTableOptions {
const PREFIX: &'static str = "vortex";
}

/// Registration entry point for the file-backed Vortex integration.
///
/// `VortexFormatFactory` is the type most applications use. Register it with a
/// DataFusion session, and DataFusion will create [`VortexFormat`] values for
/// `CREATE EXTERNAL TABLE`, [`ListingTable`], and URL-table scans.
///
/// The factory stores a [`VortexSession`] and default [`VortexTableOptions`].
/// Those defaults are copied into the formats and sources created for each
/// table.
/// The factory stores a [`VortexSession`] and optional factory-level
/// [`VortexTableOptions`]. When options are set on the factory they act like
/// customized format defaults, matching DataFusion's Parquet and JSON factory
/// APIs. Otherwise, `VortexFormatFactory::create` uses the session's `vortex`
/// options. In both cases, table `OPTIONS(...)` are applied last for the table
/// being created.
///
/// # Example
///
Expand All @@ -203,11 +251,11 @@ impl Eq for VortexTableOptions {}
/// use datafusion_common::GetExt;
/// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
///
/// let factory = Arc::new(VortexFormatFactory::new().with_options(VortexTableOptions {
/// projection_pushdown: true,
/// predicate_pushdown: true,
/// ..Default::default()
/// }));
/// let mut options = VortexTableOptions::default();
/// options.predicate_pushdown = true;
/// options.projection_pushdown = true;
///
/// let factory = Arc::new(VortexFormatFactory::new().with_options(options));
///
/// let mut state_builder = SessionStateBuilder::new()
/// .with_default_features()
Expand Down Expand Up @@ -235,7 +283,12 @@ impl GetExt for VortexFormatFactory {
}

impl VortexFormatFactory {
/// Creates a factory with a default [`VortexSession`] and default options.
/// Creates a factory with a default [`VortexSession`] and no factory-level
/// options.
///
/// Formats created by this factory start from the session's `vortex`
/// options, or from [`VortexTableOptions::default`] if the session does not
/// contain them. Table-level `OPTIONS(...)` are still applied last.
#[expect(
clippy::new_without_default,
reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
Expand All @@ -247,33 +300,37 @@ impl VortexFormatFactory {
}
}

/// Creates a factory with an explicit session and default options.
/// Creates a factory with an explicit session and factory-level options.
///
/// The supplied options become the baseline for every [`VortexFormat`]
/// created by this factory. DataFusion may still override them with
/// table-level options passed into [`FileFormatFactory::create`].
/// The supplied options become the complete starting value for every
/// [`VortexFormat`] created by this factory. Session `SET vortex.*` values
/// are ignored for these formats, matching DataFusion's built-in
/// `new_with_options` factories. Table-level `OPTIONS(...)` are still
/// applied last.
pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
Self {
session,
options: Some(options),
}
}

/// Overrides the default options for this factory.
/// Sets factory-level options.
///
/// This is the usual way to turn on features such as projection pushdown for
/// every table created through the factory.
/// This is the usual way to customize Vortex defaults for every table
/// created through the factory. These options replace, rather than merge
/// with, session `SET vortex.*` values. Table-level `OPTIONS(...)` are still
/// applied last.
///
/// # Example
///
/// ```rust
/// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
///
/// let factory = VortexFormatFactory::new().with_options(VortexTableOptions {
/// projection_pushdown: true,
/// predicate_pushdown: true,
/// ..Default::default()
/// });
/// let mut options = VortexTableOptions::default();
/// options.predicate_pushdown = true;
/// options.projection_pushdown = true;
///
/// let factory = VortexFormatFactory::new().with_options(options);
/// # let _ = factory;
/// ```
pub fn with_options(mut self, options: VortexTableOptions) -> Self {
Expand All @@ -286,13 +343,32 @@ impl FileFormatFactory for VortexFormatFactory {
#[expect(clippy::disallowed_types, reason = "required by trait signature")]
fn create(
&self,
_state: &dyn Session,
state: &dyn Session,
format_options: &std::collections::HashMap<String, String>,
) -> DFResult<Arc<dyn FileFormat>> {
let mut opts = self.options.clone().unwrap_or_default();
// This mirrors DataFusion's Parquet/JSON file-format factories:
//
// 1. Factory options are a complete customized default when present.
// 2. Without factory options, use the session's `vortex` extension
// (`SET vortex.* = ...`), falling back to built-in defaults.
// 3. Table-level `CREATE EXTERNAL TABLE ... OPTIONS(...)` values apply
// last. DataFusion prefixes file-format options with `format.`
// before passing them to this factory; SQL users write the field
// name directly, e.g. `OPTIONS(predicate_pushdown 'false')`.
let mut opts = self
.options
.clone()
.or_else(|| {
state
.config_options()
.extensions
.get::<VortexTableOptions>()
.cloned()
})
.unwrap_or_default();
for (key, value) in format_options {
if let Some(key) = key.strip_prefix("format.") {
opts.set(key, value)?;
ConfigField::set(&mut opts, key, value)?;
} else {
tracing::trace!("Ignoring option '{key}'");
}
Expand Down Expand Up @@ -698,7 +774,7 @@ mod tests {
#[test]
fn format_plumbs_footer_initial_read_size() {
let mut opts = VortexTableOptions::default();
opts.set("footer_initial_read_size_bytes", "12345").unwrap();
ConfigField::set(&mut opts, "footer_initial_read_size_bytes", "12345").unwrap();

let format = VortexFormat::new_with_options(VortexSession::default(), opts);
assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
Expand All @@ -720,7 +796,12 @@ mod tests {
.downcast_ref::<VortexSource>()
.ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?;

assert_eq!(source.options(), &opts);
assert_eq!(
source.options().projection_pushdown,
opts.projection_pushdown
);
assert_eq!(source.options().predicate_pushdown, opts.predicate_pushdown);
assert_eq!(source.options().scan_concurrency, opts.scan_concurrency);
Ok(())
}
}
4 changes: 4 additions & 0 deletions vortex-sqllogictest/bin/sqllogictests-runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::LazyLock;
use datafusion::common::GetExt;
use datafusion::datasource::provider::DefaultTableFactory;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionConfig;
use datafusion::prelude::SessionContext;
use datafusion_sqllogictest::DataFusion;
use datafusion_sqllogictest::df_value_validator;
Expand All @@ -21,6 +22,7 @@ use sqllogictest::harness::Failed;
use sqllogictest::harness::Trial;
use sqllogictest::strict_column_validator;
use vortex_datafusion::VortexFormatFactory;
use vortex_datafusion::VortexTableOptions;
use vortex_sqllogictest::duckdb::DuckDB;
use vortex_sqllogictest::duckdb::duckdb_validator;
use vortex_sqllogictest::normalize::PathNormalizing;
Expand Down Expand Up @@ -61,8 +63,10 @@ fn drive_datafusion(path: &Path, work_dir: &Path, mode: Mode) -> anyhow::Result<

let rt = build_runtime()?;
rt.block_on(async {
let config = SessionConfig::default().with_option_extension(VortexTableOptions::default());
let factory = Arc::new(VortexFormatFactory::new());
let session_state_builder = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.with_table_factory(
factory.get_ext().to_uppercase(),
Expand Down
76 changes: 76 additions & 0 deletions vortex-sqllogictest/slt/datafusion/table_options.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright the Vortex contributors

# Example-style coverage for DataFusion Vortex table options.
#
# Session settings provide the baseline for tables created through
# VortexFormatFactory::create. Per-table OPTIONS override that baseline for the
# table being created.

include ../setup.slt.no

statement ok
SET vortex.predicate_pushdown = false;

statement ok
CREATE EXTERNAL TABLE session_disabled_pushdown (
x BIGINT NOT NULL,
y BIGINT NOT NULL
)
STORED AS vortex
LOCATION '${WORK_DIR}/session_disabled_pushdown/';

statement ok
INSERT INTO session_disabled_pushdown VALUES
(1, 10),
(2, 20),
(3, 30);

query TT
EXPLAIN SELECT y FROM session_disabled_pushdown WHERE x > 1 ORDER BY y;
----
logical_plan
01)Sort: session_disabled_pushdown.y ASC NULLS LAST
02)--Projection: session_disabled_pushdown.y
03)----Filter: session_disabled_pushdown.x > Int64(1)
04)------TableScan: session_disabled_pushdown projection=[x, y], partial_filters=[session_disabled_pushdown.x > Int64(1)]
physical_plan
01)SortExec: expr=[y@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--FilterExec: x@0 > 1, projection=[y@1]
03)----DataSourceExec: file_groups={<slt:ignore>}, projection=[x, y], file_type=vortex

statement ok
CREATE EXTERNAL TABLE table_enabled_pushdown (
x BIGINT NOT NULL,
y BIGINT NOT NULL
)
STORED AS vortex
LOCATION '${WORK_DIR}/table_enabled_pushdown/'
OPTIONS(predicate_pushdown 'true');

statement ok
INSERT INTO table_enabled_pushdown VALUES
(1, 10),
(2, 20),
(3, 30);

query TT
EXPLAIN SELECT y FROM table_enabled_pushdown WHERE x > 1 ORDER BY y;
----
logical_plan
01)Sort: table_enabled_pushdown.y ASC NULLS LAST
02)--Projection: table_enabled_pushdown.y
03)----Filter: table_enabled_pushdown.x > Int64(1)
04)------TableScan: table_enabled_pushdown projection=[x, y], partial_filters=[table_enabled_pushdown.x > Int64(1)]
physical_plan
01)SortExec: expr=[y@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--DataSourceExec: file_groups={<slt:ignore>}, projection=[y], file_type=vortex, predicate: x@0 > 1

query I
SELECT y FROM table_enabled_pushdown WHERE x > 1 ORDER BY y;
----
20
30

statement ok
SET vortex.predicate_pushdown = true;
Loading