Skip to content

refactor(hash-aggr): Use EmitTo to output#23055

Open
2010YOUY01 wants to merge 8 commits into
apache:mainfrom
2010YOUY01:split-aggr-refactor-output
Open

refactor(hash-aggr): Use EmitTo to output#23055
2010YOUY01 wants to merge 8 commits into
apache:mainfrom
2010YOUY01:split-aggr-refactor-output

Conversation

@2010YOUY01

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Part of #22710

Rationale for this change

Regarding the EPIC issue: I have drafted all the migrations locally, and verified that after deleting the old implementation, UTs are passing.

We are now about 4 feature migration PRs away from completing the EPIC. Before continuing with those migrations, this PR performs some cleanup and refactoring.

What changes are included in this PR?

This PR can be read commit by commit:

  • commit 1: use EmitTo for incremental outputting
  • commit 2: split hash_table.rs into small files

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 20, 2026
@2010YOUY01 2010YOUY01 marked this pull request as draft June 21, 2026 01:16
@2010YOUY01 2010YOUY01 marked this pull request as ready for review June 21, 2026 01:16
@2010YOUY01 2010YOUY01 closed this Jun 21, 2026
@2010YOUY01 2010YOUY01 reopened this Jun 21, 2026
@alamb

alamb commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Regarding the EPIC issue: I have drafted all the migrations locally, and verified that after deleting the old implementation, UTs are passing.

Amazing

@alamb alamb left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is amazing @2010YOUY01 -- thank you. I found this code really easy to follow and understand. While it is complicated, I think it much more closely mirrors the complexity of the problem being solved now and setting up the control flow logic in this way means we will be in a much better place to improve the performance / featuers going forward

👏

cc @Rachelint

AggregateExec, PhysicalGroupBy, aggregate_expressions, evaluate_group_by,
};

/// Marker for raw rows -> partial state aggregation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this structure and how it makes it clearer what is going on with the state here

Comment thread datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs Outdated
Comment thread datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor is that the structuis called final but the module is called final_table.rs -- should we keep it consistent with final.rs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that is the marker struct for hash table aggregation mode. I have renamed it AggregateHashTable<Final> -> AggregateHashTable<FinalMarker> to make it more explicit

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise here, the struct is named Partial but the module partial_table.rs -- recommend partial.rs to be consistent

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above ⬆️

) -> Result<Option<RecordBatch>> {
let output_schema = Arc::clone(&self.output_schema);
let batch_size = self.batch_size;
match &mut self.state {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this state match and some of the outputtting state is duplicated across the types of tables, but I think it is ok

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several small differences like metrics tracking, so probably it's clearer to keep them separated 🤔

Comment thread datafusion/physical-plan/src/aggregates/aggregate_hash_table/partial_table.rs Outdated
/// In skip-partial-aggregation optimization, when a decision has made to skip
/// partial stage, build a typed hash table only for aggregation state conversion
/// row-by-row.
pub(in crate::aggregates) fn partial_skip_table(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could avoid some clones below if this consumed self rather than took it by reference

Maybe it doesn't matter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's doable, and I think we can further simplify it into a much smaller struct since for partial aggregation skip stage, only a bunch of GroupsAccmulators are used.

This requires a separate PR, but I agree it's more of a idea to polish the code, not super important for now, I'll try to address it when the refactor is mostly done

.building()
.accumulators
.iter()
.all(|acc| acc.supports_convert_to_state())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try and remove this "supports_convert_to_state" API (as a follow on PR / project) to simplify the hash aggregate code and ensure all our groups accumulators have the high performance APIs.

I filed a ticket

Comment thread datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs Outdated
Comment thread datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs Outdated
Comment thread datafusion/physical-plan/src/aggregates/aggregate_hash_table/common.rs Outdated
Comment thread datafusion/physical-plan/src/aggregates/aggregate_hash_table/partial_table.rs Outdated
@2010YOUY01

Copy link
Copy Markdown
Contributor Author

All comments have been addressed, thank you for the careful reviews! @alamb @Rachelint

I found this code really easy to follow and understand. While it is complicated, I think it much more closely mirrors the complexity of the problem being solved now and setting up the control flow logic in this way means we will be in a much better place to improve the performance / featuers going forward

I only figured this out very recently. The split-stream approach is somewhat counterintuitive: it does introduce a lot of duplicated code, but it can make the code easier to work with.

The key idea, I think, is problem decomposition. If we can break a large problem into smaller subproblems, we can tackle each of them individually.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants