MetricSign
NL|ENStart free →
Best Practices8 min·

Delta MERGE vanuit meerdere brontabellen mislukt wanneer je de deduplicatie overslaat.

Als je al je bronnen samenvoegt tot één bron, zal Spark je straffen met een foutmelding over een onduidelijke overeenkomst, tenzij je eerst de duplicaten verwijdert.

Read this article in English →

MERGE gaat ervan uit dat er één bronrij per doelsleutel is; meerdere bronnen schenden dit principe.

De MERGE INTO-bewerking van Delta Lake volgt de ANSI SQL-merge-semantiek met één niet-onderhandelbare beperking: voor elke doelrij die overeenkomt met de ON-clausule, mag precies één bronrij proberen deze te wijzigen. Wanneer je meerdere brontabellen combineert – bijvoorbeeld een CDC-stream van een operationele database en een correctiefeed van een reconciliatiesysteem – en beide een rij met dezelfde business key bevatten, geeft Spark de volgende foutmelding:

``` DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE Kan de merge niet uitvoeren omdat meerdere bronrijen overeenkomen en hebben geprobeerd dezelfde doelrij in de Delta-tabel bij te werken.

```

Dit is geen waarschuwing. De volledige MERGE-transactie wordt afgebroken. Er worden geen rijen geschreven.

De fout komt voort uit de manier waarop Spark de merge intern uitvoert. Het voert een volledige outer join uit tussen bron en doel op basis van de ON-conditie en past vervolgens de WHEN MATCHED / WHEN NOT MATCHED-logica toe. Als twee bronrijen worden samengevoegd met één doelrij, kan Spark niet bepalen welke bronrij de update moet krijgen. In plaats van willekeurig te kiezen – wat de determinisme-garanties in het transactielogboek van Delta zou schenden – geeft het een duidelijke foutmelding.

Het patroon dat dit het vaakst veroorzaakt, is het combineren van tabellen met UNION ALL, waarbij beide tabellen dezelfde sleutel bevatten, maar met verschillende tijdstempels, versies of correctievlaggen. Elke tabel afzonderlijk zou probleemloos worden samengevoegd. Wanneer ze op een naïeve manier worden gecombineerd, ontstaan er conflicten.

CTE's met UNION ALL werken, maar pas nadat je duplicaten hebt samengevoegd.

De meest voor de hand liggende syntactische aanpak is om je meerdere bronnen in een CTE te verpakken:

```sql WITH combined_source AS (

SELECT id, name, updated_at FROM source_table_a

UNION ALL

SELECT id, name, updated_at FROM source_table_b ) MERGE INTO target_delta AS t USING combined_source AS s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ```

Dit compileert. Het kan zelfs lukken tijdens het testen als je twee bronnen toevallig niet-overlappende sleutels hebben. In een productieomgeving zal het echter mislukken zodra beide bronnen dezelfde id bevatten.

De oplossing is het verwijderen van duplicaten binnen de CTE zelf:

```sql WITH combined_source AS (

SELECT id, name, updated_at

FROM (

SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) AS rn

FROM (

SELECT id, name, updated_at FROM source_table_a

UNION ALL

SELECT id, name, updated_at FROM source_table_b

)

)

WHERE rn = 1 ) MERGE INTO target_delta AS t USING combined_source AS s ON t.id = s.id WHEN MATCHED AND s.updated_at > t.updated_at THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ```

De ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) zorgt ervoor dat er precies één rij wordt verwijderd. rij per samenvoegsleutel. De ORDER BY updated_at DESC implementeert een beleid waarbij de laatst bijgewerkte rij wint — je bepaalt de tiebreaker-kolom op basis van je bedrijfslogica. Voor CDC-scenario's is dit doorgaans een volgnummer of transactietijdstempel.

Let op de extra controle AND s.updated_at > t.updated_at in de WHEN MATCHED-clausule. Dit voorkomt dat verouderde bronrijen recentere doelgegevens overschrijven wanneer het deduplicatievenster niet perfect overeenkomt met de status van het doel.

Multi-source MERGE decision tree Multiple source tables identified Do sources share merge keys? YES: Combine with UNION ALL + ROW_NUMBER() NO: Do sources update overlapping columns? YES overlapping: Single MERGE with deduped CTE NO overlapping: Sequential MERGE per source Validate operationMetrics > 0 rows affected Commit succeeds
Multi-source MERGE decision tree

De PySpark DataFrame-aanpak introduceert een andere faalmodus.

Als je je samenvoeging in PySpark bouwt met behulp van de DeltaTable API, verandert het patroon:

```python from delta.tables import DeltaTable from pyspark.sql.window import Window from pyspark.sql.functions import row_number, col

# Bronnen combineren combined = source_a.unionByName(source_b, allowMissingColumns=True)

# Duplicaten verwijderen window = Window.partitionBy("id").orderBy(col("updated_at").desc()) deduped = combined.withColumn("rn", row_number().over(window)).filter("rn = 1").drop("rn")

# Samenvoegen target = DeltaTable.forName(spark, "catalog.schema.target_delta") target.alias("t").merge(

deduped.alias("s"),

"t.id = s.id" ) ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() ```

De unionByName met allowMissingColumns=True behandelt schemaverschillen tussen je bronnen — de ene tabel kan kolommen bevatten die de andere mist. Zonder deze vlag krijgt je een AnalysisException over niet-overeenkomende kolomaantallen nog voordat de samenvoeging begint.

Het DataFrame-pad introduceert echter een subtielere foutmodus: als een van de bron-DataFrames lui wordt geëvalueerd vanuit een streaming bron of een weergave die verandert tussen de plan- en uitvoeringsfasen, kan Spark de bron halverwege de samenvoeging opnieuw evalueren. Dit kan duplicaten opnieuw introduceren die niet aanwezig waren tijdens de deduplicatiestap. De beveiliging is het aanroepen van .cache() of .checkpoint() op het gededupliceerde DataFrame voordat het aan de samenvoeging wordt doorgegeven — waardoor materialisatie van het gededupliceerde resultaat wordt afgedwongen.

Bij grootschalige samenvoegingen (brongegevens met meer dan 100 miljoen rijen) is het raadzaam om de ontdubbelde brongegevens eerst naar een tijdelijke Delta-tabel te schrijven en vervolgens vanuit die tabel samen te voegen. Deze extra schrijfbewerking elimineert het risico op herberekeningen en biedt een auditspoor van wat er precies is samengevoegd.

De ON-clausule bepaalt de reikwijdte van de botsing; beperk deze zorgvuldig.

Een veelgemaakte fout bij het samenvoegen van gegevens uit meerdere bronnen is het gebruik van een te brede ON-clausule. Stel je voor dat bron_a klantadresupdates levert en bron_b klant-e-mailupdates. Beide bronnen delen customer_id als zakelijke sleutel.

Als je ON-clausule simpelweg t.customer_id = s.customer_id is, dan veroorzaakt elke klant die in beide bronnen voorkomt een foutmelding voor meerdere overeenkomsten. Maar deze bronnen werken verschillende kolommen bij – ze zouden geen conflict mogen veroorzaken.

Twee architecturen lossen dit op zonder deduplicatie:

Sequentiële samenvoegingen: Voer twee afzonderlijke MERGE-instructies uit, één per bron. Elke samenvoeging is atomair. De tweede samenvoeging ziet de status die de eerste heeft achtergelaten.

```sql -- Eerst: adresupdates MERGE INTO customers AS t USING address_updates AS s ON t.customer_id = s.customer_id WHEN MATCHED THEN UPDATE SET t.address = s.address, t.city = s.city;

-- Ten tweede: e-mailupdates MERGE INTO customers AS t USING email_updates AS s ON t.customer_id = s.customer_id WHEN MATCHED THEN UPDATE SET t.email = s.email;

```

Samengestelde sleutel met brondiscriminator: Voeg een bron-ID toe aan de samenvoegsleutel, zodat rijen uit verschillende bronnen nooit in één keer overeenkomen met dezelfde doelrij. Dit werkt alleen als je doeltabel de bijdrage van elke bron als een aparte rij modelleert — een sterschema-patroon waarbij dimensie-updates uit meerdere systemen komen.

Sequentiële samenvoegingen zijn vrijwel altijd de juiste keuze wanneer bronnen niet-overlappende kolomsets bijwerken. Ze zijn eenvoudiger te debuggen, elke samenvoeging genereert een eigen Delta-transactielogboekvermelding en een fout in de ene samenvoeging zorgt er niet voor dat de andere wordt teruggedraaid. De kosten bestaan uit twee doorgangen over de doeltabel, maar Delta's data-skipping en Z-ordering minimaliseren die overhead bij goed gepartitioneerde tabellen.

Geplande taken worden stilzwijgend succesvol afgerond zonder dat er rijen zijn samengevoegd.

Er bestaat een foutmodus die nog erger is dan een luide foutmelding: een MERGE-bewerking die slaagt, maar niets samenvoegt. Dit gebeurt wanneer de deduplicatie logica per ongeluk alle rijen filtert — bijvoorbeeld omdat de kolom updated_at null-waarden bevat (die ROW_NUMBER onvoorspelbaar behandelt, afhankelijk van de volgorde van de null-waarden), of omdat schemaverschillen tussen bronnen ervoor zorgen dat UNION ALL stilletjes waarden naar null converteert.

Een Databricks-taak die deze samenvoeging 's nachts uitvoert, zal een succes melden. De exitcode is 0. Het Delta-transactielogboek toont een commit met nul beïnvloede rijen. je dashboards tonen de gegevens van gisteren. Niemand merkt het op totdat een belanghebbende vraagt waarom de cijfers niet zijn veranderd.

Defensieve patronen:

```python metrics = target.history(1).select("operationMetrics").collect()[0][0] rows_updated = int(metrics.get("numTargetRowsUpdated", 0)) rows_inserted = int(metrics.get("numTargetRowsInserted", 0))

if rows_updated + rows_inserted == 0:

raise ValueError(f"MERGE produced zero changes — source may be empty or fully filtered") ```

Deze controle wordt uitgevoerd nadat de samenvoeging is voltooid en genereert een expliciete foutmelding die je orchestrator (Databricks Workflows, ADF, Airflow) zal opvangen en doorgeven. Zonder deze controle worden samenvoegingen met nul rijen stilzwijgend door elke logica voor herhaling of waarschuwingen geleid die alleen op uitzonderingen let.

MetricSign monitort de resultaten van Databricks-taken en markeert uitvoeringen waarbij de taak weliswaar succesvol was, maar de gegevensstroom naar de afhankelijke datasets niet meer actueel was. Het signaal refresh_delayed detecteert precies dit scenario en legt een verband tussen de succesvolle voltooiing van de taak en het uitblijven van nieuwe gegevens in de afhankelijke datasets.

De evolutie van schema's tussen verschillende bronnen verergert het probleem.

Wanneer je meerdere brontabellen niet identieke schema's hebben — de ene heeft een nieuwe kolom loyalty_tier die de andere mist — staat je voor een keuze tijdens het samenvoegen. Door spark.databricks.delta.schema.autoMerge.enabled = true in te stellen, kan de MERGE-bewerking automatisch nieuwe kolommen aan de doeltabel toevoegen. Dit werkt echter niet goed samen met UNION ALL.

Als brontabel a de kolom loyalty_tier heeft en brontabel b niet, vereist UNION ALL dat het aantal kolommen overeenkomt. Je kunt dan twee dingen doen:

  1. Gebruik unionByName(allowMissingColumns=True) in PySpark, waarmee ontbrekende kolommen met null worden aangevuld.
  2. Voeg de kolom expliciet toe in SQL: SELECT id, name, updated_at, NULL AS loyalty_tier FROM source_table_b

Beide benaderingen introduceren null-waarden voor de ontbrekende kolom in rijen uit brontabel b. Als je deduplicatie een rij in source_b als winnaar kiest (omdat deze een nieuwere updated_at heeft), schrijft deze null naar loyalty_tier — waarmee een geldige waarde die mogelijk al in het doel bestaat, wordt overschreven.

De oplossing is een samenvoegingsvoorwaarde die rekening houdt met de herkomst van de kolom:

```sql WHEN MATCHED THEN UPDATE SET

t.name = s.name,

t.updated_at = s.updated_at,

t.loyalty_tier = COALESCE(s.loyalty_tier, t.loyalty_tier) ```

De COALESCE behoudt de bestaande waarde in het doel wanneer de bronrij een null-waarde bevat — een patroon dat soms 'samenvoegen zonder overschrijven' wordt genoemd. Het ruilt het gemak van UPDATE SET * in voor expliciete kolomcontrole, maar voorkomt gegevensverlies door schema-inconsistenties tussen bronnen.

Voor pipelines die regelmatig veranderen, dient het bijhouden van een expliciete kolomlijst in de MERGE-instructie ook als documentatie: je kunt precies zien welke kolommen elke taak aanraakt, waardoor impactanalyse eenvoudig wordt wanneer upstream-schema's wijzigen.

Veelgestelde vragen

Kan ik een JOIN gebruiken in plaats van UNION ALL om bronnen te combineren vóór een Delta MERGE?+
Ja, maar een JOIN tussen bronnen brengt het risico met zich mee dat rijen worden vermenigvuldigd als de join-sleutel niet uniek is in beide tabellen. Een klant die in zowel bron_a als bron_b voorkomt, levert één rij op met UNION ALL (per tabel), maar mogelijk een cartesisch product met JOIN. UNION ALL gevolgd door ROW_NUMBER() deduplicatie geeft je expliciete controle over welke rij wint. Gebruik JOIN alleen wanneer je een enkele bron wilt verrijken met opzoekgegevens uit een andere tabel, niet wanneer je twee wijzigingsfeeds combineert.
Treedt de DELTA_MULTIPLE_SOURCE_ROW_MATCHING-fout alleen op bij WHEN NOT MATCHED THEN INSERT?+
Nee. De fout treedt specifiek op wanneer meerdere bronrijen overeenkomen met dezelfde doelrij in een WHEN MATCHED-clausule (UPDATE of DELETE). Als je alleen WHEN NOT MATCHED THEN INSERT gebruikt, zullen dubbele bronrijen deze fout niet veroorzaken, omdat ze niet overeenkomen met bestaande doelrijen. Ze zullen echter wel dubbele rijen in het doel invoegen – een ander soort datakwaliteitsprobleem dat hoe dan ook deduplicatie in de bron vereist.
Hoe verwerkt foreachBatch streaming merge meerdere bronnen?+
Bij gestructureerde streaming met foreachBatch wordt elke microbatch onafhankelijk verwerkt. Als je meerdere streaming bronnen samenvoegt tot één stream vóór de merge, veroorzaken duplicaten binnen een enkele microbatch dezelfde fout. Verwijder duplicaten binnen de foreachBatch-functie met behulp van dropDuplicates() of het ROW_NUMBER()-patroon op de batch DataFrame voordat je de merge aanroept. Voor het verwijderen van duplicaten tussen batches is het nodig om de status te behouden of het merge-patroon met alleen invoegen te gebruiken met een unieke voorwaarde in de ON-clausule.

Gerelateerde integraties

Gerelateerde artikelen