UNION ALL in USING wordt wel door de parser verwerkt, maar niet door de uitvoerder.
Het patroon lijkt eenvoudig. Je hebt een tabel met bestellingen in een bronzen laag en een tabel met correcties uit een downstream-systeem. Beide bevatten rijen met als sleutel order_id. Je combineert ze in de USING-clausule:
```sql MERGE INTO gold.orders AS target USING ( SELECT * FROM bronze.orders
UNION ALL SELECT * FROM bronze.corrections ) AS source ON target.order_id = source.order_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ```
Dit compileert. Spark bouwt het uitvoeringsplan. Vervolgens loopt de uitvoering vast: DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE. De fout treedt op wanneer meer dan één bronrij overeenkomt met dezelfde doelrij in de ON-voorwaarde en er een WHEN MATCHED-clausule aanwezig is.
De MERGE-functie van Delta volgt de standaard SQL-semantiek. Als order_id = 1001 zowel in bronze.orders als in bronze.corrections voorkomt, kan Spark niet bepalen welke bronrij de UPDATE-waarden moet leveren. In plaats van willekeurig te kiezen – wat de resultaten niet-deterministisch zou maken – geeft Spark een foutmelding.
Dit is geen bug. De engine handhaaft een contract: elke overeenkomende doelrij moet exact overeenkomen met één bronrij. De verwarring ontstaat doordat de UNION ALL-instructie zelf perfect geldige SQL is. De parser waarschuwt niet voor de botsing verderop in de data. Je ontdekt het pas tijdens de uitvoering, nadat Spark de data al heeft geshuffled en samengevoegd. Bij grote tabellen betekent dit dat je al veel clustertijd hebt verbruikt voordat je de fout ziet.
De foutmelding zelf is enigszins onduidelijk. Er staat dat er meerdere bronrijen overeenkomen, maar er wordt niet aangegeven welke sleutels dubbel voorkomen in de bronnen. Bij een gecombineerde bron van 50 miljoen rijen blijft dit een zoektocht.
Verwijder duplicaten vóór de MERGE-bewerking, niet binnen de WHEN-clausule.
De oplossing is een CTE of subquery die de gecombineerde bron reduceert tot één rij per sleutel voordat MERGE deze te zien krijgt. Het meest betrouwbare patroon maakt gebruik van ROW_NUMBER():
```sql WITH combined AS (
SELECT *, 'orders' AS _source FROM bronze.orders
UNION ALL
SELECT *, 'corrections' AS _source FROM bronze.corrections ), deduped AS (
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY CASE _source
WHEN 'corrections' THEN 1
WHEN 'orders' THEN 2
END,
updated_at DESC
) AS rn
FROM combined
)
WHERE rn = 1 ) MERGE INTO gold.orders AS target USING deduped AS source ON target.order_id = source.order_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ```
De _source De tag en de CASE-expressie bepalen de prioriteit: correcties hebben voorrang op volgordes. Binnen dezelfde bron wint de meest recente updated_at. Het buitenste filter WHERE rn = 1 garandeert één rij per order_id.
Deze aanpak heeft een nadeel. De vensterfunctie ROW_NUMBER dwingt een shuffle af op de partitiesleutel. Bij een gecombineerde bron met 100 miljoen rijen betekent dit een volledige gegevensuitwisseling over het cluster. Twee dingen helpen: ten eerste, als je bronnen al gepartitioneerd of Z-geordend zijn op de samenvoegsleutel, kan Spark een sorteer-samenvoegstrategie gebruiken in plaats van een hash-shuffle. Ten tweede kunt je elke bron filteren om alleen recente wijzigingen op te nemen (met behulp van _commit_version uit de wijzigingsdatafeed van Delta of een watermarkkolom), zodat de gecombineerde dataset klein is ten opzichte van het doel.
Een aantrekkelijk alternatief – het gebruik van QUALIFY in plaats van de subquery – werkt op Databricks SQL-warehouses, maar niet op alle Databricks Runtime-versies voor notebooks. Blijf bij het subquery-patroon voor betere portabiliteit.
Sequentiële MERGE per bron zorgt voor ergere problemen dan duplicaten.
Wanneer engineers de foutmelding 'multiple-match' tegenkomen, is een veelvoorkomende reactie om de bewerking op te splitsen: één MERGE uitvoeren voor bronze.orders en vervolgens een tweede MERGE voor bronze.corrections. Dit voorkomt de ambiguïteitsfout omdat elke MERGE een enkele bron heeft. Het introduceert echter ook twee problemen die lastiger te debuggen zijn dan het oorspronkelijke probleem.
Ten eerste herschrijft elke MERGE de betreffende bestanden in de doeltabel. Delta's MERGE-implementatie leest de doeltabel, identificeert overeenkomende bestanden en schrijft nieuwe versies van die bestanden met de toegepaste updates. Twee opeenvolgende MERGE's die overlappende sleutelbereiken raken, zullen elk dezelfde bestanden herschrijven. In een doeltabel met 500 Parquet-bestanden herschrijft de eerste MERGE er mogelijk 120. De tweede MERGE herschrijft er nog eens 80 – waarvan sommige overlappen met de eerste batch. Je hebt nu ongeveer 200 bestanden herschreven in plaats van de 130 die een enkele MERGE zou vereisen. Bij grote tabellen verdubbelt dit de schrijfversterking en het aantal transactielogboekvermeldingen.
Ten tweede is er een risico op gelijktijdigheidsproblemen. Als een andere taak tussen je twee MERGE-bewerkingen naar dezelfde doeltabel schrijft, kan de tweede MERGE-bewerking mislukken met een ConcurrentAppendException of ConcurrentDeleteReadException, afhankelijk van het isolatieniveau. Zelfs met serialiseerbare isolatie moet de tweede MERGE-bewerking opnieuw worden uitgevoerd op de nieuwe tabelversie, wat extra latentie oplevert.
De sequentiële aanpak doorbreekt ook de atomiciteit. Als de eerste MERGE-bewerking slaagt en de tweede mislukt – bijvoorbeeld vanwege een schemafout in bronze.corrections – bevindt je doeltabel zich in een half bijgewerkte staat. Met de enkele CTE-gebaseerde MERGE-bewerking wordt ofwel de volledige upsert gecommit, ofwel niets.
Er is een derde probleem dat specifiek is voor Databricks-taken: elke MERGE-bewerking is een aparte Spark-actie, wat betekent dat er een aparte fase in de Spark-gebruikersinterface wordt weergegeven. Debuggen is lastiger omdat je twee fasen correleert met twee verschillende sets meetwaarden, shuffle reads en spill-statistieken. Een enkele MERGE geeft je slechts één fase om te inspecteren.
Vaststellen welke sleutels conflicteren tussen verschillende bronnen
Voordat je de ROW_NUMBER-correctie toepast, moet je controleren welke sleutels daadwerkelijk conflicteren. Dit voorkomt dat je een overbodige deduplicatiestap uitvoert die mogelijk problemen met de datakwaliteit in de brongegevens maskeert.
Voer de volgende diagnostische query uit op je gecombineerde bron:
```sql WITH combined AS (
SELECT order_id, 'orders' AS _source FROM bronze.orders
UNION ALL
SELECT order_id, 'corrections' AS _source FROM bronze.corrections ) SELECT order_id, COUNT(*) AS row_count,
COUNT(DISTINCT _source) AS source_count,
COLLECT_SET(_source) AS sources FROM combined GROUP BY order_id HAVING COUNT(*) > 1 ORDER BY row_count DESC LIMIT 50 ```
Dit geeft je drie dingen: welke sleutels gedupliceerd zijn, hoeveel rijen elke sleutel heeft en of de duplicaten afkomstig zijn van dezelfde bron of van verschillende bronnen. Als duplicaten zich binnen één bron bevinden, ligt het probleem stroomopwaarts — je bronzen tabel bevat dubbele records en de UNION ALL is slechts een bijkomstigheid. Als duplicaten zich over meerdere bronnen uitstrekken, hebt je de op prioriteit gebaseerde deduplicatie nodig.
Een andere nuttige diagnostische controle: controleer of de botsingen stabiel zijn of toenemen. Voer dezelfde query uit op opeenvolgende microbatches of dagelijkse laadprocessen. Als het aantal botsingen lineair toeneemt met het laadvolume, hebben je bronnen overlappende bereiken — bijvoorbeeld een correctiefeed die het oorspronkelijke record samen met de correctie afspeelt. Als botsingen sporadisch voorkomen, kan er sprake zijn van een intermitterend probleem stroomopwaarts, zoals een CDC-herhaling of een bronsysteem dat records opnieuw verzendt na een herstel.
Voor streaming MERGE (met behulp van foreachBatch) is de diagnostische controle hetzelfde, maar past je deze toe binnen de batch functie. Registreer het aantal botsingen als een aangepaste Spark-metriek, zodat je dit in de loop van de tijd kunt volgen zonder ad-hocquery's opnieuw uit te voeren.
Het streamen van foreachBatch vereist dezelfde deduplicatiebeveiliging.
Als je een gestructureerde streaming taak uitvoert die gegevens uit meerdere bronnen samenvoegt in een Delta-tabel met behulp van foreachBatch, treedt dezelfde fout op, maar de aard van de fout is ernstiger. Een batchtaak die mislukt, kan handmatig opnieuw worden gestart. Een streaming taak die mislukt bij batch N zal opnieuw worden gestart, dezelfde microbatch opnieuw verwerken, dezelfde botsing tegenkomen en opnieuw mislukken in een oneindige lus.
Het patroon binnen foreachBatch ziet er in PySpark als volgt uit:
```python def upsert_to_gold(batch_df, batch_id):
batch_df.createOrReplaceTempView("source_batch")
spark.sql("""
WITH deduped AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY updated_at DESC
) AS rn
FROM source_batch
)
MERGE INTO gold.orders AS target
USING (SELECT * FROM deduped WHERE rn = 1) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""") ```
De batch_df kan hier al het resultaat zijn van het lezen van meerdere bronnen die stroomopwaarts in de streaming grafiek zijn samengevoegd of gecombineerd. De deduplicatie binnen foreachBatch is een vangnet, geen vervanging voor het oplossen van duplicatie in de upstream-processen.
Een belangrijk detail: als je spark.conf.set("spark.databricks.delta.merge.repartitionBeforeWrite.enabled", "true") gebruikt, zal de MERGE-bewerking de uitvoerbestanden herpartitioneren volgens het partitioneringsschema van de doeldata. Dit is handig om te voorkomen dat kleine bestanden worden samengevoegd, maar voegt extra overhead toe aan de shuffle op basis van het aantal rijen. Bij kleine batches (minder dan 1 miljoen rijen) is deze overhead verwaarloosbaar. Bij grotere batches kunt je overwegen of je beide nodig hebt.
MetricSign detecteert Databricks-taakfouten met de specifieke foutcontext — inclusief de foutklasse DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE — en groepeert terugkerende fouten bij dezelfde taak, zodat je een eenmalige gegevensbotsing kunt onderscheiden van een systemisch probleem in de upstream-server voordat de vertraging in de streaming oploopt tot urenlang ontbrekende gegevens.
DBR 16.1 versoepelt de beperking, maar heft deze niet op.
Databricks Runtime 16.1 introduceerde een engere interpretatie van de regel voor meerdere overeenkomsten. Voorheen mislukte de MERGE-bewerking als twee bronrijen overeenkwamen met dezelfde doelrij in de ON-clausule, zelfs als slechts één van die bronrijen voldeed aan de WHEN MATCHED-voorwaarde. DBR 16.1 staat nu toe dat de MERGE-bewerking doorgaat zolang precies één bronrij voldoet aan het volledige WHEN MATCHED-predicaat.
Dit betekent dat je het volgende kunt schrijven:
``sql
MERGE INTO gold.orders AS target
USING combined_source AS source
ON target.order_id = source.order_id
WHEN MATCHED AND source._source = 'corrections' THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
``
Als order_id = 1001 zowel in de bronnen orders als correcties voorkomt, maar alleen de rij correcties voldoet aan _source = 'corrections', slaagt de MERGE op DBR 16.1+. Op oudere runtimes mislukt het nog steeds.
Dit is nuttig, maar beperkt. Het werkt alleen als je prioriteit kunt aangeven als filter in de WHEN MATCHED-clausule. Als beide bronrijen _source = 'corrections' hebben — omdat de correctiefeed zelf duplicaten bevat — krijg je dezelfde foutmelding als voorheen. De ROW_NUMBER-aanpak behandelt beide gevallen.
Er is ook een probleem met de portabiliteit. Als je Databricks-werkruimte verschillende runtime versies gebruikt in verschillende clusters – bijvoorbeeld het interactieve cluster op DBR 15.4 en het jobcluster op DBR 16.2 – dan werkt hetzelfde notebook wel op het ene cluster, maar niet op het andere. Het CTE-deduplicatiepatroon werkt identiek op elke runtime versie vanaf DBR 10, waardoor het de veiligere standaard is voor productie pipelines die clusterupgrades en werkruimtemigraties moeten overleven.