from airflow import DAG from datetime import datetime from airflow.operators.empty import EmptyOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.configuration import conf from airflow.utils.task_group import TaskGroup import requests job_id = {{ job_id }} {% if enable_notify ==true %} def task_finish_alert(context): print('############### task begin callback!###################') url = 'http://{{ af_backend_uri }}/af/af_run/notification' ti = context['ti'] requests.post(url, json={"data": { "job_id": job_id, "dag_id": ti.dag_id, "task_id": ti.task.task_id, "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'), "af_run_id": context["run_id"], "start_time": ti.start_date.timestamp(), "end_time": ti.end_date.timestamp(), "status": ti.current_state() }}) def dag_begin_alert(context): print('############### dag begin callback!###################') url = 'http://{{ af_backend_uri }}/af/af_run' ti = context['ti'] requests.post(url, json={"data": { "job_id": job_id, "dag_id": ti.dag_id, "task_id": ti.task.task_id, "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'), "af_run_id": context["run_id"], "start_time": ti.start_date.timestamp(), "end_time": ti.end_date.timestamp(), "status": ti.current_state() }}) {% else %} def task_finish_alert(context): pass def dag_begin_alert(context): pass {% endif %} namespace = conf.get("kubernetes", "NAMESPACE") # set the name that will be printed name = "dag_user{{ user_name }}" # instantiate the DAG with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval=None if '{{ interval }}' == 'None' else '{{ interval }}',dag_id="{{ dag_id }}",is_paused_upon_creation=True) as dag: op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert) {% for spark_node in spark_nodes %} with TaskGroup("{{ spark_node['name'] }}", prefix_group_id=False, tooltip="{{ spark_node['desc'] }}") as op_{{ spark_node['id'] }}: {% for spark_sub_node in spark_node['sub_nodes'] %} op_{{ spark_sub_node['id'] }} = KubernetesPodOperator( task_id="{{ spark_sub_node["id"] }}", image="{{ spark_sub_node['image'] }}", in_cluster=True, namespace=namespace, name="{{ spark_sub_node['name'] }}", random_name_suffix=True, labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} }, reattach_on_restart=True, startup_timeout_seconds=600, is_delete_operator_pod=False, get_logs=True, {% if image_pull_key != None or image_pull_key != ""%}image_pull_secrets='{{ image_pull_key }}',{% endif %} log_events_on_failure=True, image_pull_policy='Always', cmds={{ spark_sub_node['cmds'] }}, env_vars={{ spark_sub_node['env']}}, on_success_callback=task_finish_alert, on_failure_callback=task_finish_alert ) {% endfor %} {% for edge in spark_node['edges'] %} op_{{edge[0] }} >> op_{{edge[1] }} {% endfor %} {% endfor %} {% for node in nodes %} {{ node['operator_name'] }} = KubernetesPodOperator( task_id="{{ node["id"] }}", image="{{ node['image'] }}", in_cluster=True, namespace=namespace, name="{{ node['name'] }}", random_name_suffix=True, labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} }, reattach_on_restart=True, is_delete_operator_pod=False, get_logs=True, image_pull_policy='Always', log_events_on_failure=True, pod_template_file='/opt/airflow/pod_templates/pod_template_file.yaml', {% if image_pull_key not in ['None',"",None]%}image_pull_secrets='{{ image_pull_key }}',{% endif %} cmds={{ node['cmds'] }}, env_vars= { **{{ node['env'] }}, **{"p1":{{"\""+ "{{dag_run.conf.get('p1',None)}}" +"\""}}, "p2":{{"\""+ "{{dag_run.conf.get('p2',None)}}" +"\""}}, "p3":{{"\""+ "{{dag_run.conf.get('p3',None)}}" +"\""}}, "p4":{{"\""+ "{{dag_run.conf.get('p4',None)}}" +"\""}}, "p5":{{"\""+ "{{dag_run.conf.get('p5',None)}}" +"\""}}, "p6":{{"\""+ "{{dag_run.conf.get('p6',None)}}" +"\""}},} }, on_success_callback=task_finish_alert, on_failure_callback=task_finish_alert ) op_start >> {{ node['operator_name'] }} {% endfor %} {% for edge in edges%} {{ edge['source_operator_name'] }} >> {{ edge['target_operator_name'] }} {% endfor %}