dag_template.py.jinja2 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from airflow import DAG
  2. from datetime import datetime
  3. from airflow.operators.empty import EmptyOperator
  4. from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
  5. from airflow.configuration import conf
  6. from airflow.utils.task_group import TaskGroup
  7. import requests
  8. job_id = {{ job_id }}
  9. {% if enable_notify ==true %}
  10. def task_finish_alert(context):
  11. print('############### task begin callback!###################')
  12. url = 'http://{{ af_backend_uri }}/af/af_run/notification'
  13. ti = context['ti']
  14. requests.post(url, json={"data": {
  15. "job_id": job_id,
  16. "dag_id": ti.dag_id,
  17. "task_id": ti.task.task_id,
  18. "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
  19. "af_run_id": context["run_id"],
  20. "start_time": ti.start_date.timestamp(),
  21. "end_time": ti.end_date.timestamp(),
  22. "status": ti.current_state()
  23. }})
  24. def dag_begin_alert(context):
  25. print('############### dag begin callback!###################')
  26. url = 'http://{{ af_backend_uri }}/af/af_run'
  27. ti = context['ti']
  28. requests.post(url, json={"data": {
  29. "job_id": job_id,
  30. "dag_id": ti.dag_id,
  31. "task_id": ti.task.task_id,
  32. "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
  33. "af_run_id": context["run_id"],
  34. "start_time": ti.start_date.timestamp(),
  35. "end_time": ti.end_date.timestamp(),
  36. "status": ti.current_state()
  37. }})
  38. {% else %}
  39. def task_finish_alert(context):
  40. pass
  41. def dag_begin_alert(context):
  42. pass
  43. {% endif %}
  44. namespace = conf.get("kubernetes", "NAMESPACE")
  45. # set the name that will be printed
  46. name = "dag_user{{ user_name }}"
  47. # instantiate the DAG
  48. with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if '{{ interval }}' == 'None' else '{{ interval }}',dag_id="{{ dag_id }}",is_paused_upon_creation=True) as dag:
  49. op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert)
  50. {% for spark_node in spark_nodes %}
  51. with TaskGroup("{{ spark_node['name'] }}", prefix_group_id=False, tooltip="{{ spark_node['desc'] }}") as op_{{ spark_node['id'] }}:
  52. {% for spark_sub_node in spark_node['sub_nodes'] %}
  53. op_{{ spark_sub_node['id'] }} = KubernetesPodOperator(
  54. task_id="{{ spark_sub_node["id"] }}",
  55. image="{{ spark_sub_node['image'] }}",
  56. in_cluster=True,
  57. namespace=namespace,
  58. name="{{ spark_sub_node['name'] }}",
  59. random_name_suffix=True,
  60. labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} },
  61. reattach_on_restart=True,
  62. startup_timeout_seconds=600,
  63. is_delete_operator_pod=False,
  64. get_logs=True,
  65. {% if image_pull_key != None or image_pull_key != ""%}image_pull_secrets='{{ image_pull_key }}',{% endif %}
  66. log_events_on_failure=True,
  67. image_pull_policy='Always',
  68. cmds={{ spark_sub_node['cmds'] }},
  69. env_vars={{ spark_sub_node['env']}},
  70. on_success_callback=task_finish_alert,
  71. on_failure_callback=task_finish_alert
  72. )
  73. {% endfor %}
  74. {% for edge in spark_node['edges'] %}
  75. op_{{edge[0] }} >> op_{{edge[1] }}
  76. {% endfor %}
  77. {% endfor %}
  78. {% for node in nodes %}
  79. {{ node['operator_name'] }} = KubernetesPodOperator(
  80. task_id="{{ node["id"] }}",
  81. image="{{ node['image'] }}",
  82. in_cluster=True,
  83. namespace=namespace,
  84. name="{{ node['name'] }}",
  85. random_name_suffix=True,
  86. labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} },
  87. reattach_on_restart=True,
  88. is_delete_operator_pod=False,
  89. get_logs=True,
  90. image_pull_policy='Always',
  91. log_events_on_failure=True,
  92. pod_template_file='/opt/airflow/pod_templates/pod_template_file.yaml',
  93. {% if image_pull_key not in ['None',"",None]%}image_pull_secrets='{{ image_pull_key }}',{% endif %}
  94. cmds={{ node['cmds'] }},
  95. env_vars= { **{{ node['env'] }},
  96. **{"p1":{{"\""+ "{{dag_run.conf.get('p1',None)}}" +"\""}},
  97. "p2":{{"\""+ "{{dag_run.conf.get('p2',None)}}" +"\""}},
  98. "p3":{{"\""+ "{{dag_run.conf.get('p3',None)}}" +"\""}},
  99. "p4":{{"\""+ "{{dag_run.conf.get('p4',None)}}" +"\""}},
  100. "p5":{{"\""+ "{{dag_run.conf.get('p5',None)}}" +"\""}},
  101. "p6":{{"\""+ "{{dag_run.conf.get('p6',None)}}" +"\""}},}
  102. },
  103. on_success_callback=task_finish_alert,
  104. on_failure_callback=task_finish_alert
  105. )
  106. op_start >> {{ node['operator_name'] }}
  107. {% endfor %}
  108. {% for edge in edges%}
  109. {{ edge['source_operator_name'] }} >> {{ edge['target_operator_name'] }}
  110. {% endfor %}