Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- The Calcite aggregate push-down rule now rewrites a whole-table `SUM(col)` to a single-row `Values` computed from the zone-map table (via `VortexTable.zoneSum`), so `SELECT SUM(col)` answers metadata-only with no data segment decoded — joining the existing `MIN`/`MAX`/`COUNT` push-down. It abandons to a normal scan when a zone carries no usable sum (no zone map, or an overflowed zone) or when a `WHERE` filters the aggregate. ([24b64b32](https://github.com/dfa1/vortex-java/commit/24b64b32))
- `ScanIterator.columnZoneStats(column)` surfaces per-zone min/max/sum/null-count from a column's `vortex.stats` zone-map table without decoding any data segment — the read side of aggregate push-down (ADR 0013 §6). `ArrayStats` gains a `sum` component, decoded from the zone-map table (where the Rust reference stores it too), so the Calcite adapter now answers `SUM`/`AVG` metadata-only when every zone carries a sum, falling back to a streaming scan only for columns without a zone map. ([05dd9204](https://github.com/dfa1/vortex-java/commit/05dd9204))

### Changed
Expand Down
13 changes: 8 additions & 5 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,14 @@ Per-encoding gotchas:
min/max/sum/null count, decoding sum from the `vortex.stats` zone-map table (matches files from
Rust, whose flat writer omits per-flat sum). Calcite `VortexAggregates.SUM`/`AVG` now fold those
per-zone sums (metadata-only), falling back to a full scan only when a column has no zone map.
The fold is now a reusable `reader.compute.ZoneReducer.sum(col)` (the seam a future
`vortex-compute` extracts); Calcite consumes it.
Next: the residual tier needs a consumer first — wire Calcite aggregate+`WHERE` push-down, then
add `ZoneReducer` predicate support (whole-zone fold + boundary-zone streaming) and the
`Mask`/`Predicate`/kernel vocab on top.
The fold is a reusable `reader.compute.ZoneReducer.sum(col)` (the seam a future `vortex-compute`
extracts), now consumed by the planner: `VortexAggregatePushDownRule` rewrites a whole-table
`SUM(col)` to a single-row `Values` via `VortexTable.zoneSum`, abandoning to the scan only when a
zone carries no usable sum. A `SUM` with a `WHERE` still abandons (whole-zone stats can't answer a
filtered aggregate) — that is the residual tier below.
Next: the residual tier — give `ZoneReducer` predicate support (whole-zone fold for fully-selected
zones + boundary-zone streaming for partially-selected ones), then let the rule push `SUM` with a
`WHERE`. `Mask`/`Predicate`/kernel vocab on top.

## Encodings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@
import java.util.ArrayList;
import java.util.List;

/// Rewrites a whole-table `MIN`/`MAX`/`COUNT` aggregate over a [VortexTable] into a single-row
/// [LogicalValues] computed from the footer zone-map statistics — answering the query without
/// decoding a single data segment (ADR 0013 §6, ADR 0018 Phase 2).
/// Rewrites a whole-table `MIN`/`MAX`/`COUNT`/`SUM` aggregate over a [VortexTable] into a
/// single-row [LogicalValues] computed from the footer zone-map statistics — answering the query
/// without decoding a single data segment (ADR 0013 §6, ADR 0018 Phase 2).
///
/// Fires only when it can answer *every* aggregate from statistics: no `GROUP BY`, and each
/// call is `COUNT(*)`, `COUNT(col)`, `MIN(col)`, or `MAX(col)` over a numeric column. Anything
/// else (e.g. `SUM`, a grouped aggregate, `MIN` on a non-numeric column) leaves the plan
/// untouched for the normal scan path. `SUM`/`AVG` join this tier once the writer emits a
/// per-zone `SUM` statistic.
/// Fires only when it can answer *every* aggregate from statistics: no `GROUP BY`, and each call
/// is `COUNT(*)`, `COUNT(col)`, `MIN(col)`, `MAX(col)`, or `SUM(col)` over a numeric column. `SUM`
/// folds the per-zone `SUM` rows via [VortexTable#zoneSum(String)]; it abandons (falling back to
/// the scan) for a column whose zone-map table cannot answer it — no zone map, or an overflowed
/// zone. Anything else (a grouped aggregate, `MIN` on a non-numeric column, `AVG` that was not
/// reduced to `SUM`/`COUNT`) leaves the plan untouched for the normal scan path.
// Calcite 1.40 removed RelRule.Config.EMPTY; the modern RelRule.Config path requires the
// Immutables annotation processor. The classic operand() constructor is deprecated but fully
// supported and far lighter for a single adapter rule — suppression is localized and justified.
Expand Down Expand Up @@ -154,6 +155,24 @@ private static RexLiteral evaluate(AggregateCall agg, RelDataType outType, Vorte
}
yield numericLiteral(rexBuilder, value, outType);
}
case SUM -> {
if (agg.getArgList().size() != 1) {
yield null;
}
String col = resolveColumn(agg.getArgList().getFirst(), scanColumns, project);
if (col == null) {
yield null;
}
// Fold the per-zone SUM rows (metadata-only). A null fold means a zone carries no
// usable sum (no zone map, or an overflowed zone) — abandon so the scan computes it.
// It also covers SQL's empty/all-null SUM = NULL: zero zones fold to null and the
// scan path produces the NULL literal, so the rule need not special-case it.
Number sum = table.zoneSum(col);
if (sum == null) {
yield null;
}
yield numericLiteral(rexBuilder, sum, outType);
}
default -> null;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.github.dfa1.vortex.reader.ScanIterator;
import io.github.dfa1.vortex.reader.ScanOptions;
import io.github.dfa1.vortex.reader.VortexReader;
import io.github.dfa1.vortex.reader.compute.ZoneReducer;
import io.github.dfa1.vortex.reader.array.BoolArray;
import io.github.dfa1.vortex.reader.array.ByteArray;
import io.github.dfa1.vortex.reader.array.DoubleArray;
Expand Down Expand Up @@ -86,6 +87,23 @@ public io.github.dfa1.vortex.reader.ArrayStats statsOf(String column) {
}
}

/// Folds the per-zone `SUM` statistics for `column` without decoding any data segment, or
/// returns `null` when the zone-map table cannot answer the reduction — a column with no zone
/// map, or a zone whose sum was not retained (e.g. an overflowed zone) — so the caller scans.
///
/// Integer columns fold into a [Long] (exact); floating columns into a [Double]. Used by the
/// aggregate push-down rule to answer `SUM` (ADR 0013 §6).
///
/// @param column the numeric column name
/// @return the column sum as a [Long] or [Double], or `null` if no zone carries a usable sum
public Number zoneSum(String column) {
try (VortexReader reader = VortexReader.open(file)) {
return new ZoneReducer(reader).sum(column);
} catch (IOException e) {
throw new UncheckedIOException("cannot read zone sum of " + file, e);
}
}

/// Total row count across all chunks, read from chunk metadata without decoding data.
///
/// @return the number of rows in the file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,48 @@ void minMaxCountRewriteToValuesFromStats() throws Exception {
}
}

@Test
void sumRewritesToValuesFromZoneStats() throws Exception {
// Given a whole-table SUM over an integer column (volume, I64 → exact Long) and a floating
// column (low, F64 → Double) — both answerable by folding the per-zone SUM rows
SchemaPlus root = Frameworks.createRootSchema(true);
SchemaPlus vtx = root.add("vtx", new VortexSchema(Map.of("ohlc", file)));
FrameworkConfig config = Frameworks.newConfigBuilder()
.defaultSchema(vtx)
.parserConfig(org.apache.calcite.sql.parser.SqlParser.config()
.withUnquotedCasing(org.apache.calcite.avatica.util.Casing.UNCHANGED))
.build();
Planner planner = Frameworks.getPlanner(config);
SqlNode parsed = planner.parse("select sum(volume), sum(low) from ohlc");
RelNode logical = planner.rel(planner.validate(parsed)).rel;

// When the aggregate push-down rule runs
HepProgram program = new HepProgramBuilder()
.addRuleCollection(VortexAggregatePushDownRule.RULES)
.build();
HepPlanner hep = new HepPlanner(program);
hep.setRoot(logical);
RelNode optimized = hep.findBestExp();

// Then the plan is a single-row Values — no scan, no aggregate, no data segment decoded
String plan = RelOptUtil.toString(optimized);
assertThat(plan).contains("LogicalValues").doesNotContain("TableScan").doesNotContain("Aggregate");

Values values = findValues(optimized);
assertThat(values).isNotNull();
List<RexLiteral> sumRow = values.getTuples().getFirst();

// And the folded sums equal what ZoneReducer computes — exact Long for volume (not widened
// to Double), Double for low
try (VortexReader reader = VortexReader.open(file)) {
Number volumeSum = VortexAggregates.of(reader, "volume").sum();
Number lowSum = VortexAggregates.of(reader, "low").sum();
assertThat(volumeSum).isInstanceOf(Long.class);
assertThat(sumRow.get(0).getValueAs(Long.class)).isEqualTo(volumeSum.longValue());
assertThat(sumRow.get(1).getValueAs(Double.class)).isEqualTo(lowSum.doubleValue());
}
}

@Test
@SuppressWarnings("try") // the Hook.Closeable is used only for its scope (deregister on close)
void pushDownRunsEndToEndThroughJdbcPlanner() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,19 @@ void countColumn_withProjectPath_rewritesToValues() {
}

@Test
void sum_hasNoZoneStat_abandonsRewrite() {
// Given SUM(volume) — no SUM zone statistic exists, so evaluate() yields null and the whole
// rewrite is abandoned
void sum_withZoneStat_rewritesToValues() {
// Given SUM(volume) — the Java writer emits a per-zone SUM stat, so ZoneReducer folds every
// zone metadata-only and the whole rewrite succeeds
// When / Then — answered from the zone-map table, no Aggregate/TableScan left
assertThat(optimize("select sum(volume) from ohlc")).contains("LogicalValues").doesNotContain("Aggregate");
}

@Test
void sumOverComputedExpression_abandonsRewrite() {
// Given SUM(volume + 1) — the projected input is an expression, not a bare column ref, so
// resolveColumn returns null and the SUM branch abandons
// When / Then — the Aggregate survives for the normal scan path
assertThat(optimize("select sum(volume) from ohlc")).contains("Aggregate");
assertThat(optimize("select sum(volume + 1) from ohlc")).contains("Aggregate");
}

@Test
Expand Down
Loading