my_dag.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator, BranchPythonOperator
  3. from airflow.operators.bash import BashOperator
  4. from random import randint
  5. from datetime import datetime
  6. def _choose_best_model(ti):
  7. accuracies = ti.xcom_pull(task_ids=[
  8. 'training_model_A',
  9. 'training_model_B',
  10. 'training_model_C'
  11. ])
  12. best_accuracy = max(accuracies)
  13. if (best_accuracy > 8):
  14. return 'accurate'
  15. return 'inaccurate'
  16. def _training_model():
  17. return randint(1, 10)
  18. with DAG("my_dag", start_date=datetime(2021, 1, 1),
  19. schedule_interval="@daily", catchup=False) as dag:
  20. training_model_A = PythonOperator(
  21. task_id="training_model_A",
  22. python_callable=_training_model
  23. )
  24. training_model_B = PythonOperator(
  25. task_id="training_model_B",
  26. python_callable=_training_model
  27. )
  28. training_model_C = PythonOperator(
  29. task_id="training_model_C",
  30. python_callable=_training_model
  31. )
  32. choose_best_model = BranchPythonOperator(
  33. task_id="choose_best_model",
  34. python_callable=_choose_best_model
  35. )
  36. accurate = BashOperator(
  37. task_id="accurate",
  38. bash_command="echo 'accurate'"
  39. )
  40. inaccurate = BashOperator(
  41. task_id="inaccurate",
  42. bash_command="echo 'inaccurate'"
  43. )
  44. [training_model_A, training_model_B, training_model_C] >> choose_best_model >> [accurate, inaccurate]