feat: add CollectLeft partition mode to SortMergeJoinExec#23111
feat: add CollectLeft partition mode to SortMergeJoinExec#23111Dandandan wants to merge 2 commits into
Conversation
Add `PartitionMode::CollectLeft` for `SortMergeJoinExec`. The left side is collected into a single sorted run shared across all right partitions, and the right side is left un-repartitioned (one output partition per right partition). This avoids hash-repartitioning the right side and is beneficial when the left is small and the right is large (analogous to a broadcast hash join, but for sort-merge). Supported only for join types whose output is determined per right partition: Inner, Right, RightSemi, RightAnti, RightMark. Left-side joins would require tracking left-row matches across all right partitions and are not supported. The `JoinSelection` optimizer rule now switches a Partitioned SMJ to CollectLeft when the join type is supported and the left side is estimated small enough (reusing `hash_join_single_partition_threshold[_rows]`). - exec.rs: `mode` + shared `left_fut` (OnceAsync); `with_mode()` validation; collect-once + replay-per-partition in `execute()`; `required_input_distribution` = [SinglePartition, Unspecified]; asymmetric output partitioning; mode-aware `partition_statistics`; mode shown in EXPLAIN; mode preserved through `with_new_children` and projection pushdown. - join_selection.rs: `try_collect_left_sort_merge_join` branch. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
run benchmarks env: |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-merge-join-collect-left (f7412c8) to 3f4bcf1 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-merge-join-collect-left (f7412c8) to 3f4bcf1 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-merge-join-collect-left (f7412c8) to 3f4bcf1 (merge-base) diff using: tpch File an issue against this benchmark runner |
Resolve conflict in sort_merge_join/exec.rs: upstream replaced `partition_statistics(Option<usize>)` with `statistics_with_args(&StatisticsArgs)`. Re-apply the CollectLeft mode-aware logic in the new API (mirroring HashJoinExec): CollectLeft uses the full left stats + per-partition right stats; Partitioned uses per-partition stats for both. Update the SMJ statistics regression test to call `statistics_with_args`. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
Hi @Dandandan, your benchmark configuration could not be parsed (#23111 (comment)). Error: Usage: Any benchmark name is accepted: Per-side configuration ( env:
# shared env is inherited by BOTH the build and the run, so build
# flags go here. Builds default to no debuginfo for speed; opt back
# in for hung-job gdb dumps and cap jobs to stay within memory:
CARGO_PROFILE_RELEASE_DEBUG: "1"
CARGO_BUILD_JOBS: "1"
baseline:
ref: v45.0.0
env:
# per-side env only reaches the benchmark run, not the build
DATAFUSION_RUNTIME_MEMORY_LIMIT: 1G
changed:
ref: v46.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 2GFile an issue against this benchmark runner |
|
run benchmark tpch10 |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-merge-join-collect-left (93f94c1) to 46b508e (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-merge-join-collect-left (93f94c1) to 46b508e (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-merge-join-collect-left (93f94c1) to 46b508e (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing feat/sort-merge-join-collect-left (93f94c1) to 46b508e (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Hmm seems quite a bit slower in modt cases (probably as merging is slow/single threaded at the moment) |
Which issue does this PR close?
Rationale for this change
SortMergeJoinExeccurrently always runs in a symmetric, hash-partitioned mode:both inputs are hash-partitioned on the join keys and sorted, and partition
iof the left is merge-joined with partition
iof the right.When one side is small, hash-repartitioning the large side is wasteful. For
hash joins this is already handled by
PartitionMode::CollectLeft(collect thesmall build side once, broadcast it, don't repartition the probe side). This PR
brings the same idea to sort-merge joins.
What changes are included in this PR?
Add
PartitionMode::CollectLeftsupport toSortMergeJoinExec:across all right partitions via
OnceAsync) and the right side is leftun-repartitioned — each right partition merge-joins the full collected left,
producing one output partition per right partition.
required_input_distributionbecomes[SinglePartition, Unspecified](theright keeps whatever partitioning it has; both sides still require their sort
ordering), and output partitioning follows the right side.
Inner,Right,RightSemi,RightAnti,RightMark. Left-side joins(
Left/LeftSemi/LeftAnti/LeftMark/Full) would require trackingleft-row matches across all right partitions and are rejected by
with_mode.JoinSelectionphysical-optimizer rule switches aPartitionedSMJ toCollectLeftwhen the join type is supported and the left side is estimatedto be small enough, reusing the existing
hash_join_single_partition_threshold/hash_join_single_partition_threshold_rowsthresholds.
EnsureRequirementsthen collapses the left to one sorted run(
SortExec/SortPreservingMergeExec) and leaves the right un-hash-partitioned.EXPLAIN(only whenCollectLeft, to avoid churningexisting
Partitionedplans) and is preserved acrosswith_new_childrenandprojection pushdown;
partition_statisticsis made mode-aware (mirroringHashJoinExec).The existing k-way merge / streamed-vs-buffered execution is reused unchanged:
the collected left is replayed as the left input to each right partition's join.
Are these changes tested?
Yes:
physical-plan):CollectLeftoutput equals the defaultPartitionedoutput for all supported join types over a multi-partitionright; unsupported join types are rejected; mode-aware
partition_statisticsover a multi-partition right.
core):JoinSelectionselectsCollectLeftfor a smallleft, and stays
Partitionedfor a big left or an unsupported join type.sort_merge_join.slt/joins.sltexercise the full pipeline(
JoinSelection→EnsureRequirements→SanityCheckPlan→ execution) withprefer_hash_join = false; results are unchanged, onlyEXPLAINplans change(no hash repartition on the join key; left collapsed to one sorted run). A new
regression in
sort_merge_join.sltcovers a narrowing projection pushed into aCollectLeftjoin over a multi-partition right.Are there any user-facing changes?
EXPLAINoutput now showsmode=CollectLefton sort-merge joins that use thenew mode, and such plans avoid hash-repartitioning the right input. No public
API breakage (the mode defaults to
Partitioned;with_modeis additive). Nonew configuration options.
🤖 Generated with Claude Code