MetricSign
NL|ENStart free →
error-reference8 min·

Je Databricks-afstemmingstaak blijft oneindig draaien omdat er geen reden is om hem te stoppen.

Reconciliatietaken vergelijken twee grote datasets rij voor rij. Wanneer die vergelijking nooit convergeert, verbruikt je cluster onnodig veel rekenkracht totdat iemand het opmerkt of het budget op is.

Read this article in English →

Een reconciliatietaak zonder time-out is een belofte zonder einddatum.

Een discussie op het Databricks-communityforum met de titel "Lakebridge reconciliatiecode blijft continu draaien" beschrijft een patroon dat regelmatig voorkomt in migratieprojecten. Je schrijft reconciliatie logica – meestal een volledige outer join tussen een bronsysteem en een Lakehouse-doel – plant deze in als een Databricks-taak, maar deze wordt nooit voltooid. Het cluster blijft actief, de uitvoeringsstatus blijft op 'RUNNING' staan en niemand krijgt een melding omdat de taak technisch gezien niet is mislukt.

Het onderliggende probleem is dat Databricks-taken geen standaard maximale uitvoeringsduur hebben. Als je een taak aanmaakt via de gebruikersinterface of de Jobs API zonder timeout_seconds in te stellen, wordt de taak uitgevoerd totdat deze slaagt, mislukt of handmatig wordt geannuleerd. Voor korte ETL-taken is dit prima. Maar voor reconciliatieworkloads die miljoenen rijen vergelijken tussen twee systemen met verschillende schema's, verschillende partitioneringen en verschillende datadistributies, is het ontbreken van een time-out een uitnodiging voor de taak om uren of zelfs dagen te draaien.

De Jobs API accepteert timeout_seconds op taakniveau:

```json {

"task_key": "reconcile_orders",

"timeout_seconds": 7200,

"notebook_task": {

"notebook_path": "/Repos/prod/reconciliation/orders_recon"

} }

```

Stel dit in op een redelijke waarde — twee tot vier keer de verwachte duur van de reconciliatie. Wanneer de time-out optreedt, wordt de run beëindigd met de status RUN_DURATION_EXCEEDED. Deze status is actiegericht. Een oneindig draaiende taak is dat niet.

Je dient ook max_concurrent_runs in te stellen op 1 voor reconciliatietaken. Als een geplande trigger wordt geactiveerd terwijl de vorige run nog bezig is, start een tweede instantie met het vergelijken van dezelfde gegevens. Twee gelijktijdige reconciliatie runs op dezelfde tabellen zullen vastlopen bij Delta-tabelcommits of, op zijn minst, je rekenkosten verdubbelen zonder enig voordeel.

Volledige buitenverbindingen op scheve sleutels produceren shuffle-partities die nooit voltooid worden.

Logica voor het samenvoegen van gegevens omvat bijna altijd een volledige outer join. Je moet rijen vinden die wel in de bron maar niet in het doel voorkomen, rijen in het doel die niet in de bron voorkomen, en rijen waarvan de waarden verschillen. Het standaardpatroon ziet er als volgt uit:

```python df_diff = df_source.join(df_target, on="order_id", how="full_outer")

.filter(

col("source.amount") != col("target.amount") |

col("source.order_id").isNull() |

col("target.order_id").isNull()

) ```

Dit werkt voor kleine tabellen. Bij tabellen met tientallen miljoenen rijen zorgt de join voor een shuffle. De standaardwaarde van spark.sql.shuffle.partitions in Spark is 200. Als je join-sleutel een hoge kardinaliteit heeft maar een ongelijke verdeling — bijvoorbeeld 40% van de orders is afkomstig van drie grote klanten — dan krijgt een handvol shuffle-partities een onevenredig groot deel van de data. Deze partities duren vele malen langer dan de rest.

Je ziet dit terug in de Spark-gebruikersinterface: 197 van de 200 taken worden binnen enkele minuten voltooid, terwijl drie taken urenlang op 99% blijven hangen. De fase wordt nooit afgerond. De taak wordt nooit voltooid.

Twee oplossingen werken betrouwbaar. Ten eerste, verhoog het aantal shuffle-partities om de scheefheid te spreiden:

``python spark.conf.set("spark.sql.shuffle.partitions", 2000) ``

Ten tweede, en effectiever, kunt je je reconciliatie partitioneren op basis van een datumkolom of een modulo van de join-sleutel. In plaats van één enorme full-outer join, voer je 30 reconciliatiestappen uit — één per dag met data:

```python for day in date_range:

df_source_day = df_source.filter(col("order_date") == day)

df_target_day = df_target.filter(col("order_date") == day)

concrecile(df_source_day, df_target_day) ```

Elke stap past comfortabel in het geheugen. De totale rekentijd is vaak korter omdat geen enkele shuffle-partitie een knelpunt vormt.

Pre-launch checklist voor Databricks reconciliation jobs 1 Stel timeout_seconds in op 3x de verwachte duur 2 Stel max_concurrent_runs in op 1 3 Partitioneer reconciliation op datum of key-bereik 4 Gebruik storage-geoptimaliseerde instanties (i3 / L-series) 5 Controleer of spark.local.dir naar de NVMe-mount wijst 6 Voeg health rule toe voor RUN_DURATION_SECONDS 7 Controleer het tabblad Stages van de Spark UI na de eerste productierun
Pre-launch checklist voor Databricks reconciliation jobs

Shuffle voert schrijfbewerkingen naar de schijf uit, waardoor de lokale SSD vol raakt en de executor stilzwijgend vastloopt.

Wanneer een reconciliatie-join de beschikbare geheugencapaciteit van de executor overschrijdt, schrijft Spark shuffle-data naar de lokale SSD. Dit is normaal gedrag – Spark is ontworpen om dit af te handelen. Er is echter een drempelwaarde waarbij het wegschrijven van data problematisch wordt. Als de weggeschreven data de lokale schijfcapaciteit van het worker-node overschrijdt, komt de executor in een oneindige lus terecht: hij schrijft gedeeltelijke shuffle-data, raakt zonder ruimte, voert agressieve garbage collection uit, maakt een klein beetje geheugen vrij, schrijft nog wat meer en herhaalt dit. De executor crasht niet. Hij genereert geen OutOfMemoryError. Hij draait gewoon oneindig lang met een bijna nul-doorvoer.

Je kunt dit detecteren via het tabblad Stages in de Spark UI. Zoek naar taken waarbij "Shuffle Spill (Disk)" wordt gemeten in tientallen gigabytes, terwijl "Shuffle Spill (Memory)" een fractie daarvan is. Een verhouding van meer dan 10:1 tussen schijf- en geheugenspill betekent dat de executor overbelast is.

De oplossing hangt af van de clusterconfiguratie. Kies voor jobclusters instantie types met NVMe-opslag die evenredig is aan het datavolume. Op AWS bieden i3.xlarge-instanties 950 GB lokale NVMe-opslag. Op Azure biedt Standard_L8s_v2 1,9 TB. Als je reconciliatie uitvoert op algemene instanties zoals m5.2xlarge met alleen EBS-opslag, zullen joins met veel shuffles de maximale schijfdoorvoer bereiken ruim voordat ze de CPU of het netwerk volledig belasten.

Controleer ook spark.local.dir. Standaard verwijst deze naar /tmp, wat in sommige Databricks runtime-versies naar een klein rootvolume verwijst in plaats van naar de aangesloten NVMe-schijven. Stel het expliciet in op de NVMe-mount:

``python spark.conf.set("spark.local.dir", "/local_disk0") ``

Deze ene configuratiewijziging heeft de symptomen van "oneindige uitvoering" opgelost in gevallen waarin de reconciliatie logica zelf correct was, maar de executor stilletjes bleef draaien op een rootvolume van 50 GB.

Het cluster wordt automatisch beëindigd, maar de taak niet: een misleidend vangnet.

Teams gaan er soms vanuit dat het automatisch beëindigen van een cluster een vastgelopen taak zal stoppen. Dat is niet het geval, althans niet op de manier waarop ze dat verwachten. Automatisch beëindigen is van toepassing op algemene (interactieve) clusters die gedurende een instelbare periode inactief zijn geweest. Taakclusters werken anders. Een taakcluster wordt aangemaakt wanneer de uitvoering start en vernietigd wanneer de uitvoering eindigt. Als de uitvoering nooit eindigt, wordt het cluster ook nooit beëindigd.

Dit onderscheid is belangrijk voor de kosten. Een Standard_E8s_v3-instantie op Azure kost ongeveer $ 0,50 per uur. Een reconciliatietaak die een week lang onopgemerkt draait, kost $ 84 per node. Op een cluster met vier nodes is dat $ 336 aan pure verspilling – meer als je werkt met spot-instanties die na een preemptie zijn gepromoveerd naar on-demand.

Databricks biedt twee mechanismen om dit te voorkomen. Het eerste is timeout_seconds op de taak, zoals hierboven beschreven. De tweede is het health-blok in de taakdefinitie, beschikbaar sinds Jobs API 2.1:

```json {

"health": {

"rules": [

{

"metric": "RUN_DURATION_SECONDS",

"op": "GREATER_THAN",

"value": 10800

}

]

}

}

```

Wanneer de health-regel wordt geactiveerd, markeert Databricks de run als ongezond en kan een melding verzenden via een webhook of e-mail. In tegenstelling tot timeout_seconds beëindigt de health-regel de run niet, maar geeft een waarschuwing. Gebruik beide: de health-regel als een vroegtijdige waarschuwing en de timeout als een harde stop.

Je kunt de looptijd van taken ook programmatisch opvragen via de Runs API:

```bash curl -X GET "https://.azuredatabricks.net/api/2.1/jobs/runs/list" \

-H "Authorization: Bearer $TOKEN" \

-d '{"job_id": 12345, "active_only": true}' ```

Als de start_time van een actieve taak meer dan een paar uur oud is voor een taak die normaal gesproken binnen 30 minuten klaar is, is er iets mis. Maar het opvragen van gegevens via een API vereist dat iemand de polling-functionaliteit bouwt en onderhoudt. De meeste teams doen dat niet.

Reconciliatietools voegen abstractielagen toe die de feedback van Spark verbergen.

Lakebridge en vergelijkbare tools voor het afstemmen van migraties genereren Spark SQL- of DataFrame-code vanuit een configuratielaag. Je definieert de bron- en doeltabellen, specificeert de sleutelkolommen en de tool genereert de vergelijkingslogica. Dit is efficiënt voor het opzetten van afstemming over honderden tabellen tijdens een migratie. Het nadeel is dat de gegenereerde code mogelijk niet geoptimaliseerd is voor je specifieke gegevensdistributie en dat de abstractielaag van de tool het runtime gedrag van Spark kan verbergen.

Wanneer een gegenereerde afstemmingsquery te lang duurt, is de eerste reactie om de logboeken van de tool te bekijken. Maar de logboeken van de tool tonen meestal "afstemming bezig" — ze tonen niet de shuffle spill ratio's, scheve partities of problemen met de schijfdoorvoer die zich binnen Spark voordoen. Je moet rechtstreeks naar de Spark UI gaan.

Open de Spark UI van het Databricks-cluster, ga naar het tabblad Stages en sorteer op duur. Zoek de stage die overeenkomt met de join-bewerking. Klik erop en bekijk de statistieken op taakniveau. Als één of twee taken een duur hebben die 100 keer langer is dan de mediaan, is er sprake van een scheefheidsprobleem. Als alle taken traag zijn en een hoge garbage collection-tijd hebben, is er een geheugenprobleem. Als taken snel zijn, maar er duizenden in de wachtrij staan, is er een paralleliseringsprobleem.

Voor migratieprojecten waarbij reconciliatie over meerdere tabellen plaatsvindt, kunt je de reconciliatie van elke tabel in een try/except-blok plaatsen met een time-out per tabel met behulp van concurrent.futures van Python:

```python from concurrent.futures import ThreadPoolExecutor, TimeoutError

def reconcile_table(table_name):

# reconciliatie logica hier

pass

with ThreadPoolExecutor(max_workers=1) as executor:

future = executor.submit(reconcile_table, "orders")

try:

future.result(timeout=3600)

except TimeoutError:

print(f"Reconciliatie van orders duurde langer dan 1 uur")

future.cancel() ```

Dit geeft je controle over time-outs op tabelniveau. MetricSign monitort Databricks-taakuitvoeringen en geeft een signaal wanneer een uitvoering de historische duur overschrijdt. Het incident wordt gegroepeerd met statistieken op cluster niveau, zodat je kunt zien of de oorzaak ligt bij scheve data, overbelasting van het geheugen van de executor of een configuratieprobleem, zonder dat je hiervoor aangepaste polling via de Runs API hoeft te implementeren.

Een checklist voordat je grootschalige reconciliatie uitvoert.

Voordat je een reconciliatietaak plant die tabellen op productieschaal vergelijkt, doorloopt je deze configuratie-instellingen. Elke instelling pakt een specifiek foutscenario aan dat oneindige uitvoeringen veroorzaakt.

Stel timeout_seconds in voor elke reconciliatietaak. Er is geen geldige reden waarom een reconciliatietaak oneindig lang zou moeten draaien. Als je de verwachte duur niet weet, voert je de reconciliatie eenmaal handmatig uit, meet je de duur en stelt je de time-out in op 3x die waarde.

Configureer max_concurrent_runs: 1. Overlappende reconciliatie-uitvoeringen veroorzaken conflicten bij het vastleggen van Delta-tabellen, verdubbelen de rekenkosten en produceren dubbele discrepantierecords.

Partitioneer de reconciliatie op datum of sleutelbereik. Een enkele full-outer join over 500 miljoen rijen zal meer gegevens verschuiven dan de meeste clusters binnen een redelijke tijd aankunnen. Verdeel de reconciliatie in dagelijkse of wekelijkse partities.

Kies voor opslaggeoptimaliseerde instanties voor het taakcluster. Reconciliatie is een proces dat veel gegevens verschuift. Instanties met lokale NVMe-opslag (i3 op AWS, L-series op Azure) verwerken spills zonder overbelasting.

Controleer of spark.local.dir naar de NVMe-mount verwijst en niet naar /tmp. Dit wordt vaak over het hoofd gezien en heeft een onevenredig grote impact op workloads die veel shuffles uitvoeren.

Voeg een health rule toe met RUN_DURATION_SECONDS als waarschuwing voor vroegtijdige uitvoering. Stel deze in om te activeren vóór de harde time-out, zodat je tijd heeft om het probleem te onderzoeken voordat de run wordt afgebroken.

Controleer het tabblad Stages in de Spark UI na de eerste productierun. Controleer of geen enkele taak onevenredig lang duurt. Als dit wel het geval is, zal de scheefheid in je join key alleen maar toenemen naarmate de data groeit.

Elk van deze wijzigingen kost slechts vijf minuten. Gezamenlijk voorkomen ze het scenario dat teams om 2 uur 's nachts naar communityforums brengt: een reconciliatietaak die al 14 uur draait zonder fouten, zonder output en zonder enige indicatie wanneer – of zelfs of – deze zal worden voltooid.

Veelgestelde vragen

Waarom wordt mijn Databricks-afstemmingstaak niet automatisch beëindigd vanwege een time-out?+
Databricks-taken hebben geen standaard maximale looptijd. Tenzij je expliciet timeout_seconds instelt in de taakconfiguratie via de Jobs API of de gebruikersinterface, wordt een taak uitgevoerd totdat deze is geslaagd, mislukt met een fout of handmatig wordt geannuleerd. Voor reconciliatieworkloads die kunnen vastlopen op scheve joins of shuffle spills zonder een uitzondering te genereren, betekent dit dat de taak oneindig lang doorloopt.
Hoe kan ik vaststellen of scheefheid in de gegevens ervoor zorgt dat mijn afstemmingsproces vastloopt?+
Open de Spark UI vanuit de Databricks-clusterpagina, ga naar het tabblad Stages en klik op de stage waarin de join wordt uitgevoerd. Bekijk de duur per taak. Als 197 van de 200 taken binnen een minuut zijn voltooid, maar drie taken al uren draaien, hebben die taken een onevenredig groot deel van de data ontvangen vanwege scheve join-sleutels. De oplossing is om spark.sql.shuffle.partitions te verhogen of je reconciliatie op te splitsen in kleinere, op datum of sleutelbereik gebaseerde passes.
Beschermt automatische beëindiging van clusters me tegen Databricks-taken die uit de hand lopen?+
Nee. Automatische beëindiging is alleen van toepassing op universele (interactieve) clusters die inactief zijn geweest. Taakclusters worden aangemaakt wanneer een run start en vernietigd wanneer deze eindigt. Als de run nooit eindigt, blijft het taakcluster actief en blijven de kosten zich ophopen. Je hebt een timeout_seconds-regel nodig voor de taak en eventueel een health rule om dit te voorkomen.

Gerelateerde integraties

Gerelateerde artikelen