dag_template.py.jinja2 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from airflow import DAG
  2. from datetime import datetime
  3. from airflow.operators.empty import EmptyOperator
  4. from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
  5. from airflow.configuration import conf
  6. from airflow.utils.task_group import TaskGroup
  7. import requests
  8. job_id = {{ job_id }}
  9. def task_finish_alert(context):
  10. print('############### task begin callback!###################')
  11. url = 'http://192.168.199.109:18082/jpt/af_run/notification'
  12. ti = context['ti']
  13. requests.post(url, json={"data": {
  14. "job_id": job_id,
  15. "dag_id": ti.dag_id,
  16. "task_id": ti.task.task_id,
  17. "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
  18. "start_time": ti.start_date.timestamp(),
  19. "end_time": ti.end_date.timestamp(),
  20. "status": ti.current_state()
  21. }})
  22. def dag_begin_alert(context):
  23. print('############### dag begin callback!###################')
  24. url = 'http://192.168.199.109:18082/jpt/af_run'
  25. ti = context['ti']
  26. requests.post(url, json={"data": {
  27. "job_id": job_id,
  28. "dag_id": ti.dag_id,
  29. "task_id": ti.task.task_id,
  30. "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
  31. "start_time": ti.start_date.timestamp(),
  32. "end_time": ti.end_date.timestamp(),
  33. "status": ti.current_state()
  34. }})
  35. print('enter dag run!')
  36. namespace = conf.get("kubernetes", "NAMESPACE")
  37. # set the name that will be printed
  38. name = "dag_user{{ user_name }}"
  39. # instantiate the DAG
  40. with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',dag_id="{{ dag_id }}",is_paused_upon_creation= not {{ trigger_status }}) as dag:
  41. op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert)
  42. {% for spark_node in spark_nodes %}
  43. with TaskGroup("{{ spark_node['name'] }}", prefix_group_id=False, tooltip="{{ spark_node['desc'] }}") as op_{{ spark_node['id'] }}:
  44. {% for spark_sub_node in spark_node['sub_nodes'] %}
  45. op_{{ spark_sub_node['id'] }} = KubernetesPodOperator(
  46. task_id="{{ spark_sub_node["id"] }}",
  47. image="{{ spark_sub_node['image'] }}",
  48. in_cluster=True,
  49. namespace=namespace,
  50. name="{{ spark_sub_node['name'] }}",
  51. random_name_suffix=True,
  52. labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} },
  53. reattach_on_restart=True,
  54. is_delete_operator_pod=False,
  55. get_logs=True,
  56. log_events_on_failure=True,
  57. cmds={{ spark_sub_node['cmds'] }},
  58. env_vars={{ spark_sub_node['env'] }},
  59. on_success_callback=task_finish_alert,
  60. on_failure_callback=task_finish_alert
  61. )
  62. {% endfor %}
  63. {% for edge in spark_node['edges'] %}
  64. op_{{edge[0] }} >> op_{{edge[1] }}
  65. {% endfor %}
  66. {% endfor %}
  67. {% for node in nodes %}
  68. {{ node['operator_name'] }} = KubernetesPodOperator(
  69. task_id="{{ node["id"] }}",
  70. image="{{ node['image'] }}",
  71. in_cluster=True,
  72. namespace=namespace,
  73. name="{{ node['name'] }}",
  74. random_name_suffix=True,
  75. labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} },
  76. reattach_on_restart=True,
  77. is_delete_operator_pod=False,
  78. get_logs=True,
  79. log_events_on_failure=True,
  80. cmds={{ node['cmds'] }},
  81. env_vars={{ node['env'] }},
  82. on_success_callback=task_finish_alert,
  83. on_failure_callback=task_finish_alert
  84. {# on_failure_callback=task_finish_alert#}
  85. )
  86. op_start >> {{ node['operator_name'] }}
  87. {% endfor %}
  88. {% for edge in edges%}
  89. {{ edge['source_operator_name'] }} >> {{ edge['target_operator_name'] }}
  90. {% endfor %}