Apache Airflow est un système de planification de flux de travaux open source largement utilisé pour orchestrer et automatiser des tâches complexes. Dans le cadre de la définition des flux de travail, il est essentiel de comprendre la "règle de déclenchement" (trigger_rule), qui joue un rôle crucial dans la gestion du déclenchement des tâches à l'intérieur d'un flux de travail.
Qu'est-ce que la Règle de Déclenchement (trigger_rule) ?
La règle de déclenchement (trigger_rule) dans Apache Airflow définit le comportement à adopter lorsqu'une tâche dépendante échoue ou réussit. Elle permet de spécifier comment le déclenchement des tâches doit être affecté par le statut des tâches précédentes.
Il existe plusieurs options de règles de déclenchement que l'on peut appliquer à une tâche :
- all_success: La tâche sera déclenchée si toutes les tâches précédentes réussissent.
- all_failed: La tâche sera déclenchée si toutes les tâches précédentes échouent.
- one_success: La tâche sera déclenchée si au moins une tâche précédente réussit.
- one_failed: La tâche sera déclenchée si au moins une tâche précédente échoue.
- none_failed: La tâche sera déclenchée si aucune tâche précédente n'échoue.
- none_failed_or_skipped: La tâche sera déclenchée si aucune tâche précédente n'échoue ou n'est ignorée.
1. all_success
Dans cette règle, la tâche sera déclenchée si toutes les tâches précédentes réussissent.
from airflow.decorators import dag, task
from datetime import datetime
from airflow.utils.dates import days_ago
@dag(schedule_interval='@daily', start_date=days_ago(1))
def all_success_example():
@task(task_id='task_a')
def task_a():
return "Task A successful"
@task(task_id='task_b')
def task_b():
return "Task B successful"
@task(task_id='task_c')
def task_c():
return "Task C successful"
task_a_output = task_a()
task_b_output = task_b()
task_c_output = task_c()
task_a_output >> task_b_output >> task_c_output
dag = all_success_example()
2. all_failed
Dans cette règle, la tâche sera déclenchée si toutes les tâches précédentes échouent.
# ... (importations)
@dag(schedule_interval='@daily', start_date=days_ago(1))
def all_failed_example():
@task(task_id='task_a')
def task_a():
raise Exception("Task A failed")
@task(task_id='task_b')
def task_b():
raise Exception("Task B failed")
@task(task_id='task_c')
def task_c():
raise Exception("Task C failed")
task_a() >> task_b() >> task_c()
dag = all_failed_example()
3. one_success
Dans cette règle, la tâche sera déclenchée si au moins une tâche précédente réussit.
# ... (importations)
@dag(schedule_interval='@daily', start_date=days_ago(1))
def one_success_example():
@task(task_id='task_a')
def task_a():
return "Task A successful"
@task(task_id='task_b')
def task_b():
raise Exception("Task B failed")
@task(task_id='task_c')
def task_c():
return "Task C successful"
task_a_output = task_a()
task_b_output = task_b()
task_c_output = task_c()
task_a_output >> task_c_output
dag = one_success_example()
4. one_failed
Dans cette règle, la tâche sera déclenchée si au moins une tâche précédente échoue.
# ... (importations)
@dag(schedule_interval='@daily', start_date=days_ago(1))
def one_failed_example():
@task(task_id='task_a')
def task_a():
return "Task A successful"
@task(task_id='task_b')
def task_b():
return "Task B successful"
@task(task_id='task_c')
def task_c():
raise Exception("Task C failed")
task_a_output = task_a()
task_b_output = task_b()
task_c_output = task_c()
task_a_output >> task_b_output >> task_c_output
dag = one_failed_example()
5. none_failed
Dans cette règle, la tâche sera déclenchée si aucune tâche précédente n'échoue.
# ... (importations)
@dag(schedule_interval='@daily', start_date=days_ago(1))
def none_failed_example():
@task(task_id='task_a')
def task_a():
return "Task A successful"
@task(task_id='task_b')
def task_b():
return "Task B successful"
@task(task_id='task_c')
def task_c():
return "Task C successful"
task_a_output = task_a()
task_b_output = task_b()
task_c_output = task_c()
task_a_output >> task_b_output >> task_c_output
dag = none_failed_example()
6. none_failed_or_skipped
Dans cette règle, la tâche sera déclenchée si aucune tâche précédente n'échoue ou n'est ignorée.
# ... (importations)
@dag(schedule_interval='@daily', start_date=days_ago(1))
def none_failed_or_skipped_example():
@task(task_id='task_a')
def task_a():
return "Task A successful"
@task(task_id='task_b')
def task_b():
return "Task B successful"
@task(task_id='task_c')
def task_c():
return "Task C successful"
task_a_output = task_a()
task_b_output = task_b()
task_c_output = task_c()
task_a_output >> task_b_output >> task_c_output
dag = none_failed_or_skipped_example()
Conclusion:
Les règles de déclenchement d'Apache Airflow offrent une flexibilité et un contrôle puissants sur le flux de travail. En choisissant judicieusement parmi ces règles, vous pouvez créer des flux de travail résilients et adaptatifs. Ces exemples devraient servir de guide pour tirer le meilleur parti des règles de déclenchement dans vos projets Airflow. Que votre chemin soit pavé de succès continu ou parsemé de défis ponctuels, ces règles vous aideront à orchestrer des flux de travail robustes.