En cliquant sur "Accepter", vous acceptez que des cookies soient stockés sur votre appareil afin d'améliorer la navigation sur le site, d'analyser l'utilisation du site et de nous aider dans nos efforts de marketing.

Comprendre et Utiliser les Règles de Déclenchement (trigger_rule) dans Apache Airflow

fantôme data
Blog
>
Comprendre et Utiliser les Règles de Déclenchement (trigger_rule) dans Apache Airflow
Factorii
23/11/2023

Apache Airflow est un système de planification de flux de travaux open source largement utilisé pour orchestrer et automatiser des tâches complexes. Dans le cadre de la définition des flux de travail, il est essentiel de comprendre la "règle de déclenchement" (trigger_rule), qui joue un rôle crucial dans la gestion du déclenchement des tâches à l'intérieur d'un flux de travail.

Qu'est-ce que la Règle de Déclenchement (trigger_rule) ?


La règle de déclenchement (trigger_rule) dans Apache Airflow définit le comportement à adopter lorsqu'une tâche dépendante échoue ou réussit. Elle permet de spécifier comment le déclenchement des tâches doit être affecté par le statut des tâches précédentes.

Il existe plusieurs options de règles de déclenchement que l'on peut appliquer à une tâche :

  1. all_success: La tâche sera déclenchée si toutes les tâches précédentes réussissent.
  2. all_failed: La tâche sera déclenchée si toutes les tâches précédentes échouent.
  3. one_success: La tâche sera déclenchée si au moins une tâche précédente réussit.
  4. one_failed: La tâche sera déclenchée si au moins une tâche précédente échoue.
  5. none_failed: La tâche sera déclenchée si aucune tâche précédente n'échoue.
  6. none_failed_or_skipped: La tâche sera déclenchée si aucune tâche précédente n'échoue ou n'est ignorée.

1. all_success

Dans cette règle, la tâche sera déclenchée si toutes les tâches précédentes réussissent.

from airflow.decorators import dag, task from datetime import datetime from airflow.utils.dates import days_ago @dag(schedule_interval='@daily', start_date=days_ago(1)) def all_success_example(): @task(task_id='task_a') def task_a(): return "Task A successful" @task(task_id='task_b') def task_b(): return "Task B successful" @task(task_id='task_c') def task_c(): return "Task C successful" task_a_output = task_a() task_b_output = task_b() task_c_output = task_c() task_a_output >> task_b_output >> task_c_output dag = all_success_example()

2. all_failed

Dans cette règle, la tâche sera déclenchée si toutes les tâches précédentes échouent.

# ... (importations) @dag(schedule_interval='@daily', start_date=days_ago(1)) def all_failed_example(): @task(task_id='task_a') def task_a(): raise Exception("Task A failed") @task(task_id='task_b') def task_b(): raise Exception("Task B failed") @task(task_id='task_c') def task_c(): raise Exception("Task C failed") task_a() >> task_b() >> task_c() dag = all_failed_example()

3. one_success

Dans cette règle, la tâche sera déclenchée si au moins une tâche précédente réussit.

# ... (importations) @dag(schedule_interval='@daily', start_date=days_ago(1)) def one_success_example(): @task(task_id='task_a') def task_a(): return "Task A successful" @task(task_id='task_b') def task_b(): raise Exception("Task B failed") @task(task_id='task_c') def task_c(): return "Task C successful" task_a_output = task_a() task_b_output = task_b() task_c_output = task_c() task_a_output >> task_c_output dag = one_success_example()

4. one_failed

Dans cette règle, la tâche sera déclenchée si au moins une tâche précédente échoue.

# ... (importations) @dag(schedule_interval='@daily', start_date=days_ago(1)) def one_failed_example(): @task(task_id='task_a') def task_a(): return "Task A successful" @task(task_id='task_b') def task_b(): return "Task B successful" @task(task_id='task_c') def task_c(): raise Exception("Task C failed") task_a_output = task_a() task_b_output = task_b() task_c_output = task_c() task_a_output >> task_b_output >> task_c_output dag = one_failed_example()

5. none_failed

Dans cette règle, la tâche sera déclenchée si aucune tâche précédente n'échoue.

# ... (importations) @dag(schedule_interval='@daily', start_date=days_ago(1)) def none_failed_example(): @task(task_id='task_a') def task_a(): return "Task A successful" @task(task_id='task_b') def task_b(): return "Task B successful" @task(task_id='task_c') def task_c(): return "Task C successful" task_a_output = task_a() task_b_output = task_b() task_c_output = task_c() task_a_output >> task_b_output >> task_c_output dag = none_failed_example()

6. none_failed_or_skipped

Dans cette règle, la tâche sera déclenchée si aucune tâche précédente n'échoue ou n'est ignorée.

# ... (importations) @dag(schedule_interval='@daily', start_date=days_ago(1)) def none_failed_or_skipped_example(): @task(task_id='task_a') def task_a(): return "Task A successful" @task(task_id='task_b') def task_b(): return "Task B successful" @task(task_id='task_c') def task_c(): return "Task C successful" task_a_output = task_a() task_b_output = task_b() task_c_output = task_c() task_a_output >> task_b_output >> task_c_output dag = none_failed_or_skipped_example()

Conclusion:


Les règles de déclenchement d'Apache Airflow offrent une flexibilité et un contrôle puissants sur le flux de travail. En choisissant judicieusement parmi ces règles, vous pouvez créer des flux de travail résilients et adaptatifs. Ces exemples devraient servir de guide pour tirer le meilleur parti des règles de déclenchement dans vos projets Airflow. Que votre chemin soit pavé de succès continu ou parsemé de défis ponctuels, ces règles vous aideront à orchestrer des flux de travail robustes.

Raphael
Raphael
Guild Master