123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- from airflow import DAG
- from datetime import datetime
- from airflow.operators.empty import EmptyOperator
- from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
- from airflow.configuration import conf
- from airflow.utils.task_group import TaskGroup
- import requests
- job_id = {{ job_id }}
- {% if enable_notify ==true %}
- def task_finish_alert(context):
- print('############### task begin callback!###################')
- url = 'http://{{ af_backend_uri }}/af/af_run/notification'
- ti = context['ti']
- requests.post(url, json={"data": {
- "job_id": job_id,
- "dag_id": ti.dag_id,
- "task_id": ti.task.task_id,
- "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
- "af_run_id": context["run_id"],
- "start_time": ti.start_date.timestamp(),
- "end_time": ti.end_date.timestamp(),
- "status": ti.current_state()
- }})
- def dag_begin_alert(context):
- print('############### dag begin callback!###################')
- url = 'http://{{ af_backend_uri }}/af/af_run'
- ti = context['ti']
- requests.post(url, json={"data": {
- "job_id": job_id,
- "dag_id": ti.dag_id,
- "task_id": ti.task.task_id,
- "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
- "af_run_id": context["run_id"],
- "start_time": ti.start_date.timestamp(),
- "end_time": ti.end_date.timestamp(),
- "status": ti.current_state()
- }})
- {% else %}
- def task_finish_alert(context):
- pass
- def dag_begin_alert(context):
- pass
- {% endif %}
- namespace = conf.get("kubernetes", "NAMESPACE")
- # set the name that will be printed
- name = "dag_user{{ user_name }}"
- # instantiate the DAG
- with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if '{{ interval }}' == 'None' else '{{ interval }}',dag_id="{{ dag_id }}",is_paused_upon_creation=True) as dag:
- op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert)
- {% for spark_node in spark_nodes %}
- with TaskGroup("{{ spark_node['name'] }}", prefix_group_id=False, tooltip="{{ spark_node['desc'] }}") as op_{{ spark_node['id'] }}:
- {% for spark_sub_node in spark_node['sub_nodes'] %}
- op_{{ spark_sub_node['id'] }} = KubernetesPodOperator(
- task_id="{{ spark_sub_node["id"] }}",
- image="{{ spark_sub_node['image'] }}",
- in_cluster=True,
- namespace=namespace,
- name="{{ spark_sub_node['name'] }}",
- random_name_suffix=True,
- labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} },
- reattach_on_restart=True,
- startup_timeout_seconds=600,
- is_delete_operator_pod=False,
- get_logs=True,
- {% if image_pull_key != None or image_pull_key != ""%}image_pull_secrets='{{ image_pull_key }}',{% endif %}
- log_events_on_failure=True,
- image_pull_policy='Always',
- cmds={{ spark_sub_node['cmds'] }},
- env_vars={{ spark_sub_node['env']}},
- on_success_callback=task_finish_alert,
- on_failure_callback=task_finish_alert
- )
- {% endfor %}
- {% for edge in spark_node['edges'] %}
- op_{{edge[0] }} >> op_{{edge[1] }}
- {% endfor %}
- {% endfor %}
- {% for node in nodes %}
- {{ node['operator_name'] }} = KubernetesPodOperator(
- task_id="{{ node["id"] }}",
- image="{{ node['image'] }}",
- in_cluster=True,
- namespace=namespace,
- name="{{ node['name'] }}",
- random_name_suffix=True,
- labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} },
- reattach_on_restart=True,
- is_delete_operator_pod=False,
- get_logs=True,
- image_pull_policy='Always',
- log_events_on_failure=True,
- pod_template_file='/opt/airflow/pod_templates/pod_template_file.yaml',
- {% if image_pull_key not in ['None',"",None]%}image_pull_secrets='{{ image_pull_key }}',{% endif %}
- cmds={{ node['cmds'] }},
- env_vars= { **{{ node['env'] }},
- **{"p1":{{"\""+ "{{dag_run.conf.get('p1',None)}}" +"\""}},
- "p2":{{"\""+ "{{dag_run.conf.get('p2',None)}}" +"\""}},
- "p3":{{"\""+ "{{dag_run.conf.get('p3',None)}}" +"\""}},
- "p4":{{"\""+ "{{dag_run.conf.get('p4',None)}}" +"\""}},
- "p5":{{"\""+ "{{dag_run.conf.get('p5',None)}}" +"\""}},
- "p6":{{"\""+ "{{dag_run.conf.get('p6',None)}}" +"\""}},}
- },
- on_success_callback=task_finish_alert,
- on_failure_callback=task_finish_alert
- )
- op_start >> {{ node['operator_name'] }}
- {% endfor %}
- {% for edge in edges%}
- {{ edge['source_operator_name'] }} >> {{ edge['target_operator_name'] }}
- {% endfor %}
|