fix(cubestore): repartition merge commits with unchecked swap (fixes RepartitionRange row-count regression)#11091
Draft
paveltiunov wants to merge 9 commits into
Draft
Conversation
Add an opt-in sequential prefetch to the batch repartition loop. A producer downloads upcoming chunk parquets (anchor last, others by ascending id) into the local cache while the consumer repartitions the current chunk, so the download latency is hidden behind processing. A byte-budget semaphore bounds how much fetched-but-unprocessed data sits on local disk: each chunk holds permits worth its file size from before its download until the consumer finishes it. Chunks prefetched past the time budget stay local and warm the follow-up job, which lands on the same node by partition. download_file is idempotent and dedups in-flight downloads, so the consumer's repartition_chunk just hits the warm file. The producer makes no metastore calls; the chunk list is still read once. Controlled by CUBESTORE_REPARTITION_PREFETCH_BUDGET (accepts size suffixes, e.g. 512MB); None (unset) or 0 disables prefetching. Added env_optparse_size to keep None distinct from an explicit value.
Add an opt-in merge path for repartitioning an inactive parent's persisted chunks. Instead of splitting each chunk independently into the active children, the parent's chunks are merged k-way (in groups of up to a configurable cap) and the sorted stream is split directly into the children at the wal-split limit in one streaming pass, producing full-size chunks and avoiding the per-chunk fragment fan-out plus the compaction that would later merge them. Each group commits with a single atomic swap_chunks; a group whose source was already repartitioned by a racing job is skipped (its new chunks stay inactive and are GC'd). A fully empty group deactivates its sources directly. Children must tile the parent's range exactly, otherwise the table is deactivated as on the per-chunk path. The anchor is processed last so it keeps holding the job dedup key until the run finishes or yields on the time budget. Reuses the compaction streaming writer machinery via a new write_chunks_split_into_children that cuts files on child boundaries and the row-count limit. Controlled by CUBESTORE_REPARTITION_MERGE_MAX_INPUT_FILES (Option<usize>); None / 0 / 1 disable it, Some(m >= 2) caps each group at m input chunks. Default off.
…ector Add a third repartition strategy that slices an inactive parent's persisted chunks into RepartitionRange jobs at schedule time. Slicing walks all chunks (active and inactive) ordered by id and cuts a range once it reaches the row or chunk-count cap, so the [start, end] bounds stay pinned to chunk ids and a re-slice reproduces them; the end is carried as job data, not the dedup key, so a tail that extends the trailing range dedups on its start instead of spawning a second job. Each range runs as one atomic swap on the worker chosen by the hash of its bounds, restoring cross-worker parallelism. A GC gate keeps an inactive parent's chunks until it fully drains so slicing stays stable. Replace the ad-hoc flags with a single CUBESTORE_REPARTITION_STRATEGY selector (per_chunk default, per_partition, range); an unknown value logs a warning and falls back to per_chunk. The merge caps (CUBESTORE_REPARTITION_MERGE_MAX_INPUT_FILES default 50, CUBESTORE_REPARTITION_MERGE_MAX_ROWS default 4000000) become plain caps with defaults. The per-partition merge core is shared between the in-job loop and the range handler. JobType::RepartitionRange deserializes as an unknown variant on binaries that predate it, so it is only safe behind the skip-unknown-jobs handling; enable the strategy per-deployment.
- compute_repartition_children: an empty children set is treated as a transient topology read and returns an error to retry, instead of deactivating the table; deactivation now only happens when children are present but do not tile the parent's range (genuine corruption). - Cache the draining-parents set (inactive parents with active chunks) with a short TTL so the GC loop does not re-scan the chunk table every cycle when the range strategy is enabled. - Prefetch: bill a chunk with no recorded file_size at the full budget instead of ~free, so an unknown-size fetch cannot overshoot the on-disk budget. - Clamp the range slicing caps to >= 1 so a misconfigured 0 does not break the slicing loop before adding a chunk.
…e dedup - repartition_partition_chunks_merged now groups chunks by max_rows OR max_input_files (whichever is hit first), matching how the range strategy slices its jobs, instead of grouping purely by chunk count. - Comment the last-row/aggregate dedup in merge_chunk_group_into_children: it is the same merge_chunks compaction uses; for unique-key tables the sort key ends with the seq column so the latest version wins, and group order only breaks ties between rows with an identical (unique key, seq).
… strategy repartition_partition_chunks_yields_on_budget and repartition_partition_chunks_prefetch_drains assert per-chunk-path behavior, so set the strategy explicitly instead of inheriting the default — keeps them valid regardless of the configured strategy (mirrors how the merge/range tests pin theirs).
…urrent download in merge Remove CUBESTORE_BATCH_REPARTITION / batch_repartition_enabled. The repartition strategy (per_chunk / per_partition / range) is now the sole selector: - per_chunk: one job per chunk -> repartition_chunk - per_partition: one anchor job per partition -> merge - range: RepartitionRange jobs -> merge The job handler and scheduler dispatch on the strategy directly. The dead per_chunk+batch hybrid (per-chunk loop and its producer/consumer prefetch) is gone, along with its two now-obsolete tests. Prefetch is reworked: drop the byte-budget producer/consumer and CUBESTORE_REPARTITION_PREFETCH_BUDGET; add a plain bool CUBESTORE_REPARTITION_CONCURRENT_DOWNLOAD (default off) that downloads a merge group's chunk parquets concurrently before building the merge inputs. It applies to both per_partition and range; the group is already bounded by repartition_merge_max_input_files and the pool by download_concurrency, so no extra budget is needed.
The merge-based repartition path (per_partition / range strategies) commits its new chunks with swap_chunks, which enforces that the activated row count equals the deactivated row count. merge_chunks aggregates (aggregate indexes) and last-row-dedups (unique-key tables) the source group, so it legitimately emits fewer rows than it consumed. The checked swap then rejected the commit with "Deactivated row count (..) doesn't match activated row count (..) during swap", failing RepartitionRange / per-partition jobs. Commit with swap_chunks_without_check instead, matching how compaction commits its dedup'd merges. Adds a unit test on an aggregate index that reproduces the exact failure before the fix. Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
…dedup Adds range- and per-partition-strategy SQL tests on an aggregate-index table whose chunks share dimension keys across inserts. The repartition merge groups those rows by the sort key, so the swap activates fewer rows than it deactivates - the production RepartitionRange row-count-mismatch scenario. Without the unchecked-swap fix the repartition jobs never drain the inactive parents; with it the data stays consistent (sum(m) and per-key sums conserved). Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## cubestore-chunk-repartition-speed-up #11091 +/- ##
=======================================================================
Coverage ? 58.50%
=======================================================================
Files ? 216
Lines ? 17270
Branches ? 3524
=======================================================================
Hits ? 10103
Misses ? 6652
Partials ? 515
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
PR #11088 introduced a merge-based repartition path (
per_partitionandrangestrategies). In production withrangeenabled this regressed with a flood of failingRepartitionRangejobs:Root cause
The new merge path (
ChunkStore::merge_chunk_group_into_children) k-way merges a group of source chunks throughmerge_chunks, then commits the new child chunks.merge_chunksaggregates (for aggregate indexes — i.e. pre-aggregation rollups) and last-row-dedups (for unique-key tables) the group, so it legitimately emits fewer rows than it consumed.The commit used the checked
swap_chunks, which enforcesactivated_row_count == deactivated_row_count. For any aggregate/unique-key table the merge collapses rows, so the check rejected the swap and the repartition job failed permanently — the inactive parent never drained.The legacy per-chunk path (
partition_rows) never aggregated (it only routes rows by partition key), so the row count was always conserved there and the check passed — which is why this only surfaced on the new merge strategies.Fix
Commit the merge with
swap_chunks_without_check, exactly as compaction already does for its own dedup'd merges (compact_chunks_to_in_memory/compact_chunks_to_persistent). The row-count equality invariant does not hold for an aggregating/deduping merge, so enforcing it is incorrect for this path.Tests
Following the request to reproduce the error first, then fix it:
store::tests::repartition_merge_aggregate_index_collapses_rows: builds an aggregate index whose two source chunks share every key, drivesrepartition_chunk_range, and asserts the children conserve the aggregated row count. Before the fix this panics with the exact production error:Deactivated row count (20) doesn't match activated row count (10) during swap of (1, 2) to (3, 4) chunks.repartition_range_jobs_aggregate_index_keeps_data_consistentandrepartition_merge_aggregate_index_keeps_data_consistent: aggregate-index table, dimension keys repeated across inserts, forced split, full scheduler → range-slicing → job-runner path. Before the fix the inactive parents never drain (jobs keep failing the swap); after itsum(m)and every per-key sum are conserved.All 15
repartition*unit/SQL tests pass with the fix; the three new tests fail without it.Risk
Mirrors the long-standing compaction behavior; default strategy remains
per_chunk(unchanged). No metastore/schema changes.