[WIP][SPARK-55951][SPARK-55952][SPARK-55953][SQL] Add post-processing for CDC changelog tables#55426
[WIP][SPARK-55951][SPARK-55952][SPARK-55953][SQL] Add post-processing for CDC changelog tables#55426SanJSp wants to merge 2 commits intoapache:masterfrom
Conversation
Add analyzer rule and execution primitives for Spark-side CDC post-processing: - `ResolveChangelogTable` analyzer rule: injects carry-over removal and/or update-detection plans above a resolved `DataSourceV2Relation(ChangelogTable)`. - `CarryOverRemoval` logical node + `CarryOverRemovalExec` physical node + `CarryOverIterator`: sort-based removal of identical delete+insert CoW artifacts, keyed by rowId+rowVersion. - Update detection: Window-based relabeling of delete+insert pairs within the same rowId+rowVersion partition to `update_preimage`/`update_postimage`. - `ChangelogTable.validateSchema`: fail-fast validation of required CDC metadata columns, column types, and row-identity requirements. - New error class `INVALID_CHANGELOG_SCHEMA` with four sub-classes (MISSING_COLUMN, INVALID_COLUMN_TYPE, MISSING_ROW_ID, NESTED_ROW_ID). - `InMemoryChangelogCatalog` extended with `ChangelogProperties` to configure post-processing scenarios in tests. - Tests: `ChangelogResolutionSuite` schema-validation cases and `ResolveChangelogTablePostProcessingSuite` end-to-end SQL coverage.
| val rowIdColumnNames = cl.rowId().map(_.fieldNames()(0)).toSeq | ||
| val rowVersionColumnName = cl.rowVersion().fieldNames()(0) // e.g. "_commit_version" |
There was a problem hiding this comment.
We pass column names and resolve them to ordinals at execution time in CarryOverRemovalExec.doExecute(). Ordinals computed at analysis time were tried first but broke: ColumnPruning / projection rewrites between analysis and physical planning can reorder or re-number columns, so analysis-time ordinals pointed at the wrong fields by the time the exec node ran. Resolving names against child.output at execute time dodges that.
| } | ||
| } else { | ||
| // pendingDelete != null && input.hasNext | ||
| val nextRow = input.next().copy() |
There was a problem hiding this comment.
Input rows may be mutable/reused by the upstream iterator. Any row buffered for comparison against the next row is defensively copied via .copy(). This is the standard DSv2 pattern, but the per-row copy cost is non-trivial on large scans. Confirm whether the upstream stage here guarantees non-reused rows (in which case we could drop the copy), or whether we should keep the defensive copy.
| * @param dataOrdinals column indices for data column comparison (field-by-field equality) | ||
| * @param schema the output schema for generic data column comparison | ||
| */ | ||
| class CarryOverIterator( |
There was a problem hiding this comment.
I know this is what's described in the proposal https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?tab=t.0
but I'm not convinced this whole machinery is really what we want
At a high-level, the way carry-overs are removed is by identifying rows that are deleted+inserted with the same row ID and row version.
The issue is that the proposal conflates row version with commit version: the version at which the row is deleted/inserted. With that definition, we can't distinguish between updates and rows being copied: both delete/insert get the current commit version as row version.
Delta & Iceberg actually mean something different by row version: the version at which the row was last modified - e.p. deleted rows don't get a new row version
https://github.com/delta-io/delta/blob/master/PROTOCOL.md#row-commit-versions
https://iceberg.apache.org/spec/#row-lineage
This definition actually makes implementing carry-over removal straightforward: given a delete and insert for a given row ID, if the row versions are equals it's a copy, if they aren't it's an update.
We still need a concept of commit version though because:
- This is used to compute updates: insert and deletes within the same commit would otherwise always show as delete+insert since they have different row versions (insert = current commit version, delete=previous commit version where the row was deleted)
- This is the API users query against when using version ranges:
CHANGES FROM VERSION 5 TO VERSION 10means all rows modifies in this commit range. Deleted rows don't get a new row version when they are deleted so they wouldn't show up.
I think we should update ChangeLog to include both _commit_version and row_version.
Either explicitly:
/**
* Returns the column used for ordering changes within the same row identity, used for
* removing carry-overs.
* <p>
* The default implementation throws {@link UnsupportedOperationException}. Connectors
* that support update detection must override this method.
*/
default NamedReference rowVersion() {
throw new UnsupportedOperationException("rowVersion is not supported.");
}
/**
* Returns the column used for collecting rows within the same commit, used for
* update detection.
*/
default NamedReference commitVersion() {
return FieldReference.column("_commit_version")
}
or implicitly - we already require column _commit_version to be present in the output:
/**
* Returns the column used for ordering changes within the same row identity, used for
* removing carry-overs.
* <p>
* The default implementation throws {@link UnsupportedOperationException}. Connectors
* that support update detection must override this method.
*/
default NamedReference rowVersion() {
throw new UnsupportedOperationException("rowVersion is not supported.");
}
| if (options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE && | ||
| changelog.containsCarryoverRows()) { | ||
| updatedRel = injectCarryoverRemoval(rel, changelog) | ||
| } | ||
| if (options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()) { | ||
| updatedRel = injectUpdateDetection(updatedRel, changelog) | ||
| } | ||
| if (options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES && | ||
| changelog.containsIntermediateChanges()) { | ||
| updatedRel = injectNetChangeComputation(updatedRel, changelog) | ||
| } |
There was a problem hiding this comment.
I believe it should be possible to perform all post-process in only two passes at most.
The proposal mentions computing updates during carry-over removal
https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?tab=t.0#heading=h.6ylqglw1kcc2
I'm ok leaving that for a follow up though
Ongoing changes still open:
What changes were proposed in this pull request?
Building atop #54739 and #54738, this PR introduces changes to support Change Data Capture Queries given an underlying connector provides the expected data.
Important discussion posted to dev@spark.apache.org.
Added changes:
ResolveChangelogTableanalyzer rule: injects carry-over removal and/or update-detection plans above a resolvedDataSourceV2Relation(ChangelogTable).CarryOverRemovallogical node +CarryOverRemovalExecphysical node +CarryOverIterator: sort-based removal of identical delete+insert CoW artifacts, keyed by rowId+rowVersion.update_preimage/update_postimage.ChangelogTable.validateSchema: fail-fast validation of required CDC metadata columns, column types, and row-identity requirements.INVALID_CHANGELOG_SCHEMAwith four sub-classes (MISSING_COLUMN, INVALID_COLUMN_TYPE, MISSING_ROW_ID, NESTED_ROW_ID).InMemoryChangelogCatalogextended withChangelogPropertiesto configure post-processing scenarios in tests.Why are the changes needed?
Spark currently has the DSv2 CDC API and the SQL CHANGES clause, but the
computeUpdatesanddeduplicationModeoptions are not acted upon. This PR wires the analyzer-side post-processing so connectors implementing Changelog can deliver real CDC feeds.See the discussion and linked SPIP.
Does this PR introduce any user-facing change?
For connectors implementing the Changelog API, the deduplicationMode and computeUpdates options on CHANGES FROM VERSION ... now take effect (carry-over removal by default, optional update detection). Previously these options were parsed but silently ignored.
Before / after example (click to expand)
Given a
Changelogconnector that advertisescontainsCarryoverRows = true,representsUpdateAsDeleteAndInsert = true,and
containsIntermediateChanges = true, with rowIdidand rowVersion_commit_version:Setup (raw changes emitted by the connector for versions 1–3):
Query:
Before this PR — no analyzer rule, all options are silently ignored;
user sees the raw emitted rows including carry-over noise and no
update labeling regardless of
WITH (...):After this PR,
WITH (computeUpdates = 'true')(default
deduplicationMode = 'drop_carryovers') — carry-over removal andupdate detection apply:
After this PR,
WITH (deduplicationMode = 'netChanges')— intermediatestates are collapsed into the net effect per row over the whole range
(
id=1ends up asAlex;id=3was inserted and deleted within the range,so it disappears from the output):
How was this patch tested?
Tests:
ChangelogResolutionSuiteschema-validation cases andResolveChangelogTablePostProcessingSuiteend-to-end SQL coverage.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6 / 4.7