MERGE assumes one source row per target key — multiple sources break that contract
Delta Lake's MERGE INTO operation follows ANSI SQL merge semantics with one non-negotiable constraint: for any given target row matched by the ON clause, exactly one source row may attempt to modify it. When you combine multiple source tables — say, a CDC stream from an operational database and a correction feed from a reconciliation system — and both contain a row with the same business key, Spark throws:
``
DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE
Cannot perform Merge as multiple source rows matched and attempted to update the same target row in the Delta table.
``
This is not a warning. The entire MERGE transaction aborts. No rows are written.
The error emerges from how Spark executes the merge internally. It performs a full outer join between source and target on the ON condition, then applies WHEN MATCHED / WHEN NOT MATCHED logic. If two source rows join to one target row, Spark cannot determine which source row's values should win the update. Rather than pick arbitrarily — which would violate determinism guarantees in Delta's transaction log — it fails loudly.
The pattern that triggers this most often: combining tables with UNION ALL where both tables contain the same key but with different timestamps, versions, or correction flags. Each table alone would merge cleanly. Combined naively, they collide.
CTEs with UNION ALL work — but only after you collapse duplicates
The syntactically obvious approach is wrapping your multiple sources in a CTE:
``sql
WITH combined_source AS (
SELECT id, name, updated_at FROM source_table_a
UNION ALL
SELECT id, name, updated_at FROM source_table_b
)
MERGE INTO target_delta AS t
USING combined_source AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
``
This compiles. It may even succeed in testing when your two sources happen to have non-overlapping keys. In production, the moment both sources contain the same id, it fails.
The fix is deduplication within the CTE itself:
``sql
WITH combined_source AS (
SELECT id, name, updated_at
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) AS rn
FROM (
SELECT id, name, updated_at FROM source_table_a
UNION ALL
SELECT id, name, updated_at FROM source_table_b
)
)
WHERE rn = 1
)
MERGE INTO target_delta AS t
USING combined_source AS s
ON t.id = s.id
WHEN MATCHED AND s.updated_at > t.updated_at THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
``
The ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) ensures exactly one row per merge key. The ORDER BY updated_at DESC implements a last-writer-wins policy — you decide the tiebreaker column based on your business logic. For CDC scenarios, this is typically a sequence number or transaction timestamp.
Note the additional guard AND s.updated_at > t.updated_at in the WHEN MATCHED clause. This prevents stale source rows from overwriting fresher target data when the deduplication window doesn't perfectly align with the target's state.
PySpark DataFrame approach introduces a different failure mode
If you build your merge in PySpark using the DeltaTable API, the pattern shifts:
```python from delta.tables import DeltaTable from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col
# Combine sources combined = source_a.unionByName(source_b, allowMissingColumns=True)
# Deduplicate window = Window.partitionBy("id").orderBy(col("updated_at").desc()) deduped = combined.withColumn("rn", row_number().over(window)).filter("rn = 1").drop("rn")
# Merge target = DeltaTable.forName(spark, "catalog.schema.target_delta") target.alias("t").merge( deduped.alias("s"), "t.id = s.id" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() ```
The unionByName with allowMissingColumns=True handles schema differences between your sources — one table may have columns the other lacks. Without this flag, you get an AnalysisException about mismatched column counts before the merge even starts.
However, the DataFrame path introduces a subtler failure mode: if either source DataFrame is lazily evaluated from a streaming source or a view that changes between the plan and execution phases, Spark may re-evaluate the source mid-merge. This can reintroduce duplicates that weren't present during the deduplication step. The safeguard is calling .cache() or .checkpoint() on the deduped DataFrame before passing it to the merge — forcing materialization of the deduplicated result.
For large-scale merges (source exceeding 100M rows), consider writing the deduped source to a temporary Delta table first, then merging from that materialized table. The extra write eliminates re-computation risk and gives you an audit trail of exactly what was merged.
The ON clause determines collision scope — narrow it carefully
A common mistake when merging from multiple sources is using an ON clause that's too broad. Consider a scenario where source_a provides customer address updates and source_b provides customer email updates. Both share customer_id as the business key.
If your ON clause is simply t.customer_id = s.customer_id, then any customer appearing in both sources triggers the multiple-match error. But these sources update different columns — they shouldn't conflict.
Two architectures solve this without deduplication:
Sequential merges: Run two separate MERGE statements, one per source. Each merge is atomic. The second merge sees the state left by the first.
```sql -- First: address updates MERGE INTO customers AS t USING address_updates AS s ON t.customer_id = s.customer_id WHEN MATCHED THEN UPDATE SET t.address = s.address, t.city = s.city;
-- Second: email updates MERGE INTO customers AS t USING email_updates AS s ON t.customer_id = s.customer_id WHEN MATCHED THEN UPDATE SET t.email = s.email; ```
Composite key with source discriminator: Add a source identifier to the merge key so rows from different sources never match the same target row in a single pass. This only works if your target table models each source's contribution as a separate row — a star schema pattern where dimension updates arrive from multiple systems.
Sequential merges are almost always the right choice when sources update non-overlapping column sets. They're simpler to debug, each produces its own Delta transaction log entry, and a failure in one doesn't roll back the other. The cost is two passes over the target table — but Delta's data skipping and Z-ordering minimize that overhead on well-partitioned tables.
Scheduled jobs silently succeed with zero rows merged
There's a failure mode worse than a loud error: a MERGE that succeeds but merges nothing. This happens when the deduplication logic inadvertently filters out all rows — perhaps because the updated_at column contains nulls (which ROW_NUMBER treats unpredictably depending on null ordering), or because schema drift between sources causes the UNION ALL to silently cast values to null.
A Databricks job that runs this merge nightly will report success. The exit code is 0. The Delta transaction log shows a commit with zero rows affected. Your dashboards show yesterday's data. Nobody notices until a stakeholder asks why the numbers haven't moved.
Defensive patterns:
```python metrics = target.history(1).select("operationMetrics").collect()[0][0] rows_updated = int(metrics.get("numTargetRowsUpdated", 0)) rows_inserted = int(metrics.get("numTargetRowsInserted", 0))
if rows_updated + rows_inserted == 0: raise ValueError(f"MERGE produced zero changes — source may be empty or fully filtered") ```
This check runs after the merge completes and raises an explicit error that your orchestrator (Databricks Workflows, ADF, Airflow) will catch and escalate. Without it, zero-row merges pass silently through any retry or alerting logic that only watches for exceptions.
MetricSign monitors Databricks job outcomes and flags runs where the job succeeded but downstream data freshness stalled — the refresh_delayed signal catches exactly this scenario, correlating the successful job completion with the absence of new data arriving in dependent datasets.
Schema evolution across sources compounds the problem
When your multiple source tables don't share identical schemas — one has a new loyalty_tier column the other lacks — you face a decision at merge time. Setting spark.databricks.delta.schema.autoMerge.enabled = true allows the MERGE to add new columns to the target automatically. But this interacts poorly with UNION ALL.
If source_a has loyalty_tier and source_b doesn't, UNION ALL requires matching column counts. You either:
- Use
unionByName(allowMissingColumns=True)in PySpark, which fills missing columns with null - Explicitly add the column in SQL:
SELECT id, name, updated_at, NULL AS loyalty_tier FROM source_table_b
Both approaches introduce null values for the missing column in rows from source_b. If your deduplication picks a source_b row as the winner (because it has a newer updated_at), it writes null into loyalty_tier — overwriting a valid value that might already exist in the target.
The fix is a merge condition that respects column provenance:
``sql
WHEN MATCHED THEN UPDATE SET
t.name = s.name,
t.updated_at = s.updated_at,
t.loyalty_tier = COALESCE(s.loyalty_tier, t.loyalty_tier)
``
The COALESCE preserves the target's existing value when the source row carries a null — a pattern sometimes called merge-without-clobber. It trades the convenience of UPDATE SET * for explicit column control, but prevents data loss from schema mismatches across sources.
For pipelines that evolve frequently, maintaining an explicit column list in the MERGE statement also serves as documentation — you can see exactly which columns each job touches, making impact analysis straightforward when upstream schemas change.