Skip to content

[WIP][SPARK-55951][SPARK-55952][SPARK-55953][SQL] Add post-processing for CDC changelog tables#55426

Open
SanJSp wants to merge 2 commits intoapache:masterfrom
SanJSp:SPARK-55668-Post-Process-for-CDC
Open

[WIP][SPARK-55951][SPARK-55952][SPARK-55953][SQL] Add post-processing for CDC changelog tables#55426
SanJSp wants to merge 2 commits intoapache:masterfrom
SanJSp:SPARK-55668-Post-Process-for-CDC

Conversation

@SanJSp
Copy link
Copy Markdown

@SanJSp SanJSp commented Apr 20, 2026

Ongoing changes still open:

  • Test suite cleanup
  • Adding net changes support

What changes were proposed in this pull request?

Building atop #54739 and #54738, this PR introduces changes to support Change Data Capture Queries given an underlying connector provides the expected data.
Important discussion posted to dev@spark.apache.org.
Added changes:

  • ResolveChangelogTable analyzer rule: injects carry-over removal and/or update-detection plans above a resolved DataSourceV2Relation(ChangelogTable).
  • CarryOverRemoval logical node + CarryOverRemovalExec physical node + CarryOverIterator: sort-based removal of identical delete+insert CoW artifacts, keyed by rowId+rowVersion.
  • Update detection: Window-based relabeling of delete+insert pairs within the same rowId+rowVersion partition to update_preimage/update_postimage.
  • ChangelogTable.validateSchema: fail-fast validation of required CDC metadata columns, column types, and row-identity requirements.
  • New error class INVALID_CHANGELOG_SCHEMA with four sub-classes (MISSING_COLUMN, INVALID_COLUMN_TYPE, MISSING_ROW_ID, NESTED_ROW_ID).
  • InMemoryChangelogCatalog extended with ChangelogProperties to configure post-processing scenarios in tests.

Why are the changes needed?

Spark currently has the DSv2 CDC API and the SQL CHANGES clause, but the computeUpdates and deduplicationMode options are not acted upon. This PR wires the analyzer-side post-processing so connectors implementing Changelog can deliver real CDC feeds.
See the discussion and linked SPIP.

Does this PR introduce any user-facing change?

For connectors implementing the Changelog API, the deduplicationMode and computeUpdates options on CHANGES FROM VERSION ... now take effect (carry-over removal by default, optional update detection). Previously these options were parsed but silently ignored.

Before / after example (click to expand)

Given a Changelog connector that advertises
containsCarryoverRows = true, representsUpdateAsDeleteAndInsert = true,
and containsIntermediateChanges = true, with rowId id and rowVersion
_commit_version:

Setup (raw changes emitted by the connector for versions 1–3):

id | name   | _change_type | _commit_version
---+--------+--------------+----------------
 1 | Alice  | insert       | 1    -- initial insert
 2 | Bob    | insert       | 1    -- initial insert
 3 | Carol  | insert       | 1    -- initial insert
 1 | Alice  | delete       | 2    -- rename Alice -> Alicia
 1 | Alicia | insert       | 2    -- rename Alice -> Alicia
 2 | Bob    | delete       | 2    -- carry-over (row unchanged, CoW rewrite)
 2 | Bob    | insert       | 2    -- carry-over (row unchanged, CoW rewrite)
 3 | Carol  | delete       | 2    -- real delete
 1 | Alicia | delete       | 3    -- rename Alicia -> Alex
 1 | Alex   | insert       | 3    -- rename Alicia -> Alex

Query:

SELECT id, name, _change_type, _commit_version
  FROM cat.people CHANGES FROM VERSION 1 TO VERSION 3
  WITH (<options>);

Before this PR — no analyzer rule, all options are silently ignored;
user sees the raw emitted rows including carry-over noise and no
update labeling regardless of WITH (...):

 1 | Alice  | insert       | 1
 2 | Bob    | insert       | 1
 3 | Carol  | insert       | 1
 1 | Alice  | delete       | 2
 1 | Alicia | insert       | 2
 2 | Bob    | delete       | 2    -- noise
 2 | Bob    | insert       | 2    -- noise
 3 | Carol  | delete       | 2
 1 | Alicia | delete       | 3
 1 | Alex   | insert       | 3

After this PR, WITH (computeUpdates = 'true')
(default deduplicationMode = 'drop_carryovers') — carry-over removal and
update detection apply:

 1 | Alice  | insert           | 1
 2 | Bob    | insert           | 1
 3 | Carol  | insert           | 1
 1 | Alice  | update_preimage  | 2
 1 | Alicia | update_postimage | 2
 3 | Carol  | delete           | 2
 1 | Alicia | update_preimage  | 3
 1 | Alex   | update_postimage | 3

After this PR, WITH (deduplicationMode = 'netChanges') — intermediate
states are collapsed into the net effect per row over the whole range
(id=1 ends up as Alex; id=3 was inserted and deleted within the range,
so it disappears from the output):

 1 | Alex   | insert       | 3
 2 | Bob    | insert       | 1

How was this patch tested?

Tests: ChangelogResolutionSuite schema-validation cases and ResolveChangelogTablePostProcessingSuite end-to-end SQL coverage.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.6 / 4.7

SanJSp added 2 commits April 20, 2026 12:13
Add analyzer rule and execution primitives for Spark-side CDC post-processing:

- `ResolveChangelogTable` analyzer rule: injects carry-over removal and/or
  update-detection plans above a resolved `DataSourceV2Relation(ChangelogTable)`.
- `CarryOverRemoval` logical node + `CarryOverRemovalExec` physical node +
  `CarryOverIterator`: sort-based removal of identical delete+insert CoW
  artifacts, keyed by rowId+rowVersion.
- Update detection: Window-based relabeling of delete+insert pairs within the
  same rowId+rowVersion partition to `update_preimage`/`update_postimage`.
- `ChangelogTable.validateSchema`: fail-fast validation of required CDC
  metadata columns, column types, and row-identity requirements.
- New error class `INVALID_CHANGELOG_SCHEMA` with four sub-classes
  (MISSING_COLUMN, INVALID_COLUMN_TYPE, MISSING_ROW_ID, NESTED_ROW_ID).
- `InMemoryChangelogCatalog` extended with `ChangelogProperties` to configure
  post-processing scenarios in tests.
- Tests: `ChangelogResolutionSuite` schema-validation cases and
  `ResolveChangelogTablePostProcessingSuite` end-to-end SQL coverage.
Comment on lines +196 to +197
val rowIdColumnNames = cl.rowId().map(_.fieldNames()(0)).toSeq
val rowVersionColumnName = cl.rowVersion().fieldNames()(0) // e.g. "_commit_version"
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We pass column names and resolve them to ordinals at execution time in CarryOverRemovalExec.doExecute(). Ordinals computed at analysis time were tried first but broke: ColumnPruning / projection rewrites between analysis and physical planning can reorder or re-number columns, so analysis-time ordinals pointed at the wrong fields by the time the exec node ran. Resolving names against child.output at execute time dodges that.

}
} else {
// pendingDelete != null && input.hasNext
val nextRow = input.next().copy()
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Input rows may be mutable/reused by the upstream iterator. Any row buffered for comparison against the next row is defensively copied via .copy(). This is the standard DSv2 pattern, but the per-row copy cost is non-trivial on large scans. Confirm whether the upstream stage here guarantees non-reused rows (in which case we could drop the copy), or whether we should keep the defensive copy.

* @param dataOrdinals column indices for data column comparison (field-by-field equality)
* @param schema the output schema for generic data column comparison
*/
class CarryOverIterator(
Copy link
Copy Markdown
Contributor

@johanl-db johanl-db Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is what's described in the proposal https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?tab=t.0
but I'm not convinced this whole machinery is really what we want

At a high-level, the way carry-overs are removed is by identifying rows that are deleted+inserted with the same row ID and row version.

The issue is that the proposal conflates row version with commit version: the version at which the row is deleted/inserted. With that definition, we can't distinguish between updates and rows being copied: both delete/insert get the current commit version as row version.

Delta & Iceberg actually mean something different by row version: the version at which the row was last modified - e.p. deleted rows don't get a new row version
https://github.com/delta-io/delta/blob/master/PROTOCOL.md#row-commit-versions
https://iceberg.apache.org/spec/#row-lineage

This definition actually makes implementing carry-over removal straightforward: given a delete and insert for a given row ID, if the row versions are equals it's a copy, if they aren't it's an update.

We still need a concept of commit version though because:

  1. This is used to compute updates: insert and deletes within the same commit would otherwise always show as delete+insert since they have different row versions (insert = current commit version, delete=previous commit version where the row was deleted)
  2. This is the API users query against when using version ranges: CHANGES FROM VERSION 5 TO VERSION 10 means all rows modifies in this commit range. Deleted rows don't get a new row version when they are deleted so they wouldn't show up.

I think we should update ChangeLog to include both _commit_version and row_version.
Either explicitly:

  /**
   * Returns the column used for ordering changes within the same row identity, used for
   * removing carry-overs.
   * <p>
   * The default implementation throws {@link UnsupportedOperationException}. Connectors
   * that support update detection must override this method.
   */
  default NamedReference rowVersion() {
    throw new UnsupportedOperationException("rowVersion is not supported.");
  }

  /**
   * Returns the column used for collecting rows within the same commit, used for
   * update detection.
   */
  default NamedReference commitVersion() {
    return FieldReference.column("_commit_version")
  }

or implicitly - we already require column _commit_version to be present in the output:

  /**
   * Returns the column used for ordering changes within the same row identity, used for
   * removing carry-overs.
   * <p>
   * The default implementation throws {@link UnsupportedOperationException}. Connectors
   * that support update detection must override this method.
   */
  default NamedReference rowVersion() {
    throw new UnsupportedOperationException("rowVersion is not supported.");
  }

Comment on lines +52 to +62
if (options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE &&
changelog.containsCarryoverRows()) {
updatedRel = injectCarryoverRemoval(rel, changelog)
}
if (options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()) {
updatedRel = injectUpdateDetection(updatedRel, changelog)
}
if (options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES &&
changelog.containsIntermediateChanges()) {
updatedRel = injectNetChangeComputation(updatedRel, changelog)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it should be possible to perform all post-process in only two passes at most.
The proposal mentions computing updates during carry-over removal
https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?tab=t.0#heading=h.6ylqglw1kcc2

I'm ok leaving that for a follow up though

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants