Skip to content

Comet produces bloated results in comparison with Spark #4723

Description

@sandugood

Describe the bug

Was testing out Comet in a multi-stage ETL process (contains multiple joins and native Iceberg scans). Some of the steps look like a usual Comet execution:

CometSinkPlaceHolder
+- CometColumnarExchange
   +- Project
      +- SortMergeJoin
         :- CometSort
         :  +- CometSinkPlaceHolder
         :     +- CometColumnarExchange
         :        +- Project
         :           +- SortMergeJoin
         :              :- CometSort
         :              :  +- CometSinkPlaceHolder
         :              :     +- CometColumnarExchange
         :              :        +- Project
         :              :           +- SortMergeJoin
         :              :              :- CometSort
         :              :              :  +- CometSinkPlaceHolder
         :              :              :     +- CometColumnarExchange
         :              :              :        +- Project
         :              :              :           +- SortMergeJoin
         :              :              :              :- CometSort
         :              :              :              :  +- CometSinkPlaceHolder
         :              :              :              :     +- CometColumnarExchange
         :              :              :              :        +- Project
         :              :              :              :           +- SortMergeJoin
         :              :              :              :              :- Sort
         :              :              :              :              :  +-  HashAggregate [COMET: Unsupported data type: TimestampNTZType, Unsupported aggregate expression(s)]
         :              :              :              :              :     +- CometSinkPlaceHolder
         :              :              :              :              :        +- CometColumnarExchange
         :              :              :              :              :           +- HashAggregate
         :              :              :              :              :              +-  InMemoryTableScan [COMET: InMemoryTableScan is not supported]
         :              :              :              :              :                    +- InMemoryRelation
         :              :              :              :              :                          +- CometExchange
         :              :              :              :              :                             +- CometProject
         :              :              :              :              :                                +- CometSortMergeJoin
         :              :              :              :              :                                   :- CometSort
         :              :              :              :              :                                   :  +- CometExchange
         :              :              :              :              :                                   :     +- CometProject
         :              :              :              :              :                                   :        +- CometFilter
         :              :              :              :              :                                   :           +- CometIcebergNativeScan
         :              :              :              :              :                                   +- CometSort
         :              :              :              :              :                                      +- CometFilter
         :              :              :              :              :                                         +- CometHashAggregate
         :              :              :              :              :                                            +- CometExchange
         :              :              :              :              :                                               +- CometHashAggregate
         :              :              :              :              :                                                  +- CometUnion
         :              :              :              :              :                                                     :- CometProject
         :              :              :              :              :                                                     :  +- CometFilter
         :              :              :              :              :                                                     :     +- CometIcebergNativeScan
         :              :              :              :              :                                                     +- CometIcebergNativeScan

However when aggregating the final result (to test Comet vs Spark sanity) - getting bloated values for sum aggregation. We can even see that without the aggregation, just comparing single values for users (domain-specific) across both resulting tables.
Note:

  • using "spark.comet.scan.icebergNative.enabled": "true". So native Iceberg scan is enabled. And using it from both iceberg-rust and iceberg-storage-opendal repo main branch. Why? There was a fix regarding reading .parquet files that didn't contain page index. Before that - whole query would faild. Now it works, but produces inconsistent results. Check fix(reader): graceful handling of missing column index iceberg-rust#2693
  • bloated values are consistent and deterministic. So it isn't related to spark.comet.exec.strictFloatingPoint being set to true or false. Wrong results are being consistent.

When disabling native Iceberg scan - query runs as intended, results are being stored in an iceberg table and results are consistent across vanilla Spark and Comet. So the problem itself lies somewhere in the iceberg-rust integration in Comet.

Steps to reproduce

No response

Expected behavior

Same (or almost the same down to floating point precision) values for both Spark and Comet

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions