MetricSign
NL|ENStart free →
Best Practices8 min·

Delta MERGE vanuit meerdere brontabellen mislukt omdat UNION ALL niet voldoende is.

Een UNION ALL in de USING-clausule lijkt correct totdat twee brontabellen een rij voor dezelfde sleutel aanleveren. Delta verwerpt de ambiguïteit direct.

Read this article in English →

UNION ALL in USING wordt wel door de parser verwerkt, maar niet door de uitvoerder.

Het patroon lijkt eenvoudig. Je hebt een tabel met bestellingen in een bronzen laag en een tabel met correcties uit een downstream-systeem. Beide bevatten rijen met als sleutel order_id. Je combineert ze in de USING-clausule:

```sql MERGE INTO gold.orders AS target USING ( SELECT * FROM bronze.orders

UNION ALL SELECT * FROM bronze.corrections ) AS source ON target.order_id = source.order_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ```

Dit compileert. Spark bouwt het uitvoeringsplan. Vervolgens loopt de uitvoering vast: DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE. De fout treedt op wanneer meer dan één bronrij overeenkomt met dezelfde doelrij in de ON-voorwaarde en er een WHEN MATCHED-clausule aanwezig is.

De MERGE-functie van Delta volgt de standaard SQL-semantiek. Als order_id = 1001 zowel in bronze.orders als in bronze.corrections voorkomt, kan Spark niet bepalen welke bronrij de UPDATE-waarden moet leveren. In plaats van willekeurig te kiezen – wat de resultaten niet-deterministisch zou maken – geeft Spark een foutmelding.

Dit is geen bug. De engine handhaaft een contract: elke overeenkomende doelrij moet exact overeenkomen met één bronrij. De verwarring ontstaat doordat de UNION ALL-instructie zelf perfect geldige SQL is. De parser waarschuwt niet voor de botsing verderop in de data. Je ontdekt het pas tijdens de uitvoering, nadat Spark de data al heeft geshuffled en samengevoegd. Bij grote tabellen betekent dit dat je al veel clustertijd hebt verbruikt voordat je de fout ziet.

De foutmelding zelf is enigszins onduidelijk. Er staat dat er meerdere bronrijen overeenkomen, maar er wordt niet aangegeven welke sleutels dubbel voorkomen in de bronnen. Bij een gecombineerde bron van 50 miljoen rijen blijft dit een zoektocht.

Verwijder duplicaten vóór de MERGE-bewerking, niet binnen de WHEN-clausule.

De oplossing is een CTE of subquery die de gecombineerde bron reduceert tot één rij per sleutel voordat MERGE deze te zien krijgt. Het meest betrouwbare patroon maakt gebruik van ROW_NUMBER():

```sql WITH combined AS (

SELECT *, 'orders' AS _source FROM bronze.orders

UNION ALL

SELECT *, 'corrections' AS _source FROM bronze.corrections ), deduped AS (

SELECT * FROM (

SELECT *,

ROW_NUMBER() OVER (

PARTITION BY order_id

ORDER BY CASE _source

WHEN 'corrections' THEN 1

WHEN 'orders' THEN 2

END,

updated_at DESC

) AS rn

FROM combined

)

WHERE rn = 1 ) MERGE INTO gold.orders AS target USING deduped AS source ON target.order_id = source.order_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ```

De _source De tag en de CASE-expressie bepalen de prioriteit: correcties hebben voorrang op volgordes. Binnen dezelfde bron wint de meest recente updated_at. Het buitenste filter WHERE rn = 1 garandeert één rij per order_id.

Deze aanpak heeft een nadeel. De vensterfunctie ROW_NUMBER dwingt een shuffle af op de partitiesleutel. Bij een gecombineerde bron met 100 miljoen rijen betekent dit een volledige gegevensuitwisseling over het cluster. Twee dingen helpen: ten eerste, als je bronnen al gepartitioneerd of Z-geordend zijn op de samenvoegsleutel, kan Spark een sorteer-samenvoegstrategie gebruiken in plaats van een hash-shuffle. Ten tweede kunt je elke bron filteren om alleen recente wijzigingen op te nemen (met behulp van _commit_version uit de wijzigingsdatafeed van Delta of een watermarkkolom), zodat de gecombineerde dataset klein is ten opzichte van het doel.

Een aantrekkelijk alternatief – het gebruik van QUALIFY in plaats van de subquery – werkt op Databricks SQL-warehouses, maar niet op alle Databricks Runtime-versies voor notebooks. Blijf bij het subquery-patroon voor betere portabiliteit.

Delta MERGE with multi-source deduplication UNION ALL source tables into CTE Tag each row with _source identifier Apply ROW_NUMBER() OVER (PARTITION BY merge_key ORDER BY Filter WHERE rn = 1 Pass deduplicated result to MERGE INTO USING clause Single atomic commit to target Delta table
Delta MERGE with multi-source deduplication

Sequentiële MERGE per bron zorgt voor ergere problemen dan duplicaten.

Wanneer engineers de foutmelding 'multiple-match' tegenkomen, is een veelvoorkomende reactie om de bewerking op te splitsen: één MERGE uitvoeren voor bronze.orders en vervolgens een tweede MERGE voor bronze.corrections. Dit voorkomt de ambiguïteitsfout omdat elke MERGE een enkele bron heeft. Het introduceert echter ook twee problemen die lastiger te debuggen zijn dan het oorspronkelijke probleem.

Ten eerste herschrijft elke MERGE de betreffende bestanden in de doeltabel. Delta's MERGE-implementatie leest de doeltabel, identificeert overeenkomende bestanden en schrijft nieuwe versies van die bestanden met de toegepaste updates. Twee opeenvolgende MERGE's die overlappende sleutelbereiken raken, zullen elk dezelfde bestanden herschrijven. In een doeltabel met 500 Parquet-bestanden herschrijft de eerste MERGE er mogelijk 120. De tweede MERGE herschrijft er nog eens 80 – waarvan sommige overlappen met de eerste batch. Je hebt nu ongeveer 200 bestanden herschreven in plaats van de 130 die een enkele MERGE zou vereisen. Bij grote tabellen verdubbelt dit de schrijfversterking en het aantal transactielogboekvermeldingen.

Ten tweede is er een risico op gelijktijdigheidsproblemen. Als een andere taak tussen je twee MERGE-bewerkingen naar dezelfde doeltabel schrijft, kan de tweede MERGE-bewerking mislukken met een ConcurrentAppendException of ConcurrentDeleteReadException, afhankelijk van het isolatieniveau. Zelfs met serialiseerbare isolatie moet de tweede MERGE-bewerking opnieuw worden uitgevoerd op de nieuwe tabelversie, wat extra latentie oplevert.

De sequentiële aanpak doorbreekt ook de atomiciteit. Als de eerste MERGE-bewerking slaagt en de tweede mislukt – bijvoorbeeld vanwege een schemafout in bronze.corrections – bevindt je doeltabel zich in een half bijgewerkte staat. Met de enkele CTE-gebaseerde MERGE-bewerking wordt ofwel de volledige upsert gecommit, ofwel niets.

Er is een derde probleem dat specifiek is voor Databricks-taken: elke MERGE-bewerking is een aparte Spark-actie, wat betekent dat er een aparte fase in de Spark-gebruikersinterface wordt weergegeven. Debuggen is lastiger omdat je twee fasen correleert met twee verschillende sets meetwaarden, shuffle reads en spill-statistieken. Een enkele MERGE geeft je slechts één fase om te inspecteren.

Vaststellen welke sleutels conflicteren tussen verschillende bronnen

Voordat je de ROW_NUMBER-correctie toepast, moet je controleren welke sleutels daadwerkelijk conflicteren. Dit voorkomt dat je een overbodige deduplicatiestap uitvoert die mogelijk problemen met de datakwaliteit in de brongegevens maskeert.

Voer de volgende diagnostische query uit op je gecombineerde bron:

```sql WITH combined AS (

SELECT order_id, 'orders' AS _source FROM bronze.orders

UNION ALL

SELECT order_id, 'corrections' AS _source FROM bronze.corrections ) SELECT order_id, COUNT(*) AS row_count,

COUNT(DISTINCT _source) AS source_count,

COLLECT_SET(_source) AS sources FROM combined GROUP BY order_id HAVING COUNT(*) > 1 ORDER BY row_count DESC LIMIT 50 ```

Dit geeft je drie dingen: welke sleutels gedupliceerd zijn, hoeveel rijen elke sleutel heeft en of de duplicaten afkomstig zijn van dezelfde bron of van verschillende bronnen. Als duplicaten zich binnen één bron bevinden, ligt het probleem stroomopwaarts — je bronzen tabel bevat dubbele records en de UNION ALL is slechts een bijkomstigheid. Als duplicaten zich over meerdere bronnen uitstrekken, hebt je de op prioriteit gebaseerde deduplicatie nodig.

Een andere nuttige diagnostische controle: controleer of de botsingen stabiel zijn of toenemen. Voer dezelfde query uit op opeenvolgende microbatches of dagelijkse laadprocessen. Als het aantal botsingen lineair toeneemt met het laadvolume, hebben je bronnen overlappende bereiken — bijvoorbeeld een correctiefeed die het oorspronkelijke record samen met de correctie afspeelt. Als botsingen sporadisch voorkomen, kan er sprake zijn van een intermitterend probleem stroomopwaarts, zoals een CDC-herhaling of een bronsysteem dat records opnieuw verzendt na een herstel.

Voor streaming MERGE (met behulp van foreachBatch) is de diagnostische controle hetzelfde, maar past je deze toe binnen de batch functie. Registreer het aantal botsingen als een aangepaste Spark-metriek, zodat je dit in de loop van de tijd kunt volgen zonder ad-hocquery's opnieuw uit te voeren.

Het streamen van foreachBatch vereist dezelfde deduplicatiebeveiliging.

Als je een gestructureerde streaming taak uitvoert die gegevens uit meerdere bronnen samenvoegt in een Delta-tabel met behulp van foreachBatch, treedt dezelfde fout op, maar de aard van de fout is ernstiger. Een batchtaak die mislukt, kan handmatig opnieuw worden gestart. Een streaming taak die mislukt bij batch N zal opnieuw worden gestart, dezelfde microbatch opnieuw verwerken, dezelfde botsing tegenkomen en opnieuw mislukken in een oneindige lus.

Het patroon binnen foreachBatch ziet er in PySpark als volgt uit:

```python def upsert_to_gold(batch_df, batch_id):

batch_df.createOrReplaceTempView("source_batch")

spark.sql("""

WITH deduped AS (

SELECT *,

ROW_NUMBER() OVER (

PARTITION BY order_id

ORDER BY updated_at DESC

) AS rn

FROM source_batch

)

MERGE INTO gold.orders AS target

USING (SELECT * FROM deduped WHERE rn = 1) AS source

ON target.order_id = source.order_id

WHEN MATCHED THEN UPDATE SET *

WHEN NOT MATCHED THEN INSERT *

""") ```

De batch_df kan hier al het resultaat zijn van het lezen van meerdere bronnen die stroomopwaarts in de streaming grafiek zijn samengevoegd of gecombineerd. De deduplicatie binnen foreachBatch is een vangnet, geen vervanging voor het oplossen van duplicatie in de upstream-processen.

Een belangrijk detail: als je spark.conf.set("spark.databricks.delta.merge.repartitionBeforeWrite.enabled", "true") gebruikt, zal de MERGE-bewerking de uitvoerbestanden herpartitioneren volgens het partitioneringsschema van de doeldata. Dit is handig om te voorkomen dat kleine bestanden worden samengevoegd, maar voegt extra overhead toe aan de shuffle op basis van het aantal rijen. Bij kleine batches (minder dan 1 miljoen rijen) is deze overhead verwaarloosbaar. Bij grotere batches kunt je overwegen of je beide nodig hebt.

MetricSign detecteert Databricks-taakfouten met de specifieke foutcontext — inclusief de foutklasse DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE — en groepeert terugkerende fouten bij dezelfde taak, zodat je een eenmalige gegevensbotsing kunt onderscheiden van een systemisch probleem in de upstream-server voordat de vertraging in de streaming oploopt tot urenlang ontbrekende gegevens.

DBR 16.1 versoepelt de beperking, maar heft deze niet op.

Databricks Runtime 16.1 introduceerde een engere interpretatie van de regel voor meerdere overeenkomsten. Voorheen mislukte de MERGE-bewerking als twee bronrijen overeenkwamen met dezelfde doelrij in de ON-clausule, zelfs als slechts één van die bronrijen voldeed aan de WHEN MATCHED-voorwaarde. DBR 16.1 staat nu toe dat de MERGE-bewerking doorgaat zolang precies één bronrij voldoet aan het volledige WHEN MATCHED-predicaat.

Dit betekent dat je het volgende kunt schrijven:

``sql MERGE INTO gold.orders AS target USING combined_source AS source ON target.order_id = source.order_id WHEN MATCHED AND source._source = 'corrections' THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ``

Als order_id = 1001 zowel in de bronnen orders als correcties voorkomt, maar alleen de rij correcties voldoet aan _source = 'corrections', slaagt de MERGE op DBR 16.1+. Op oudere runtimes mislukt het nog steeds.

Dit is nuttig, maar beperkt. Het werkt alleen als je prioriteit kunt aangeven als filter in de WHEN MATCHED-clausule. Als beide bronrijen _source = 'corrections' hebben — omdat de correctiefeed zelf duplicaten bevat — krijg je dezelfde foutmelding als voorheen. De ROW_NUMBER-aanpak behandelt beide gevallen.

Er is ook een probleem met de portabiliteit. Als je Databricks-werkruimte verschillende runtime versies gebruikt in verschillende clusters – bijvoorbeeld het interactieve cluster op DBR 15.4 en het jobcluster op DBR 16.2 – dan werkt hetzelfde notebook wel op het ene cluster, maar niet op het andere. Het CTE-deduplicatiepatroon werkt identiek op elke runtime versie vanaf DBR 10, waardoor het de veiligere standaard is voor productie pipelines die clusterupgrades en werkruimtemigraties moeten overleven.

Veelgestelde vragen

Waarom mislukt Delta MERGE bij gebruik van UNION ALL met meerdere brontabellen?+
Delta MERGE volgt de SQL-standaardsemantiek en vereist dat elke overeenkomende doelrij exact overeenkomt met één bronrij. Wanneer UNION ALL bronnen combineert die dezelfde sleutels delen, komen meerdere bronrijen overeen met dezelfde doelrij en kan de engine niet bepalen welke rij de UPDATE-waarden moet leveren. In plaats van willekeurig een rij te kiezen, genereert de engine de foutmelding DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE.
Moet ik in plaats daarvan aparte MERGE-instructies uitvoeren voor elke brontabel?+
Nee. Sequentiële MERGE-bewerkingen herschrijven overlappende doelbestanden twee keer, waardoor de schrijfversterking verdubbelt. Ze verbreken ook de atomiciteit: als de tweede MERGE-bewerking mislukt, is je doelbestand slechts gedeeltelijk bijgewerkt. Gebruik een enkele MERGE-bewerking met een CTE die de gecombineerde bron ontdubbelt met behulp van ROW_NUMBER vóór de USING-clausule.
Lost Databricks Runtime 16.1 de fout op die optreedt bij het matchen van meerdere bronrijen?+
Gedeeltelijk. DBR 16.1 staat toe dat MERGE slaagt wanneer meerdere bronrijen overeenkomen in de ON-clausule, maar slechts één voldoet aan het WHEN MATCHED-predicaat. Als er duplicaten in dezelfde bron voorkomen of als beide rijen aan het predicaat voldoen, treedt de fout nog steeds op. Het ROW_NUMBER-deduplicatiepatroon werkt in alle runtime versies vanaf DBR 10.

Gerelateerde integraties

Gerelateerde artikelen