from airflow import DAG from datetime import datetime from airflow.operators.empty import EmptyOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.configuration import conf from airflow.utils.task_group import TaskGroup import requests job_id = {{ job_id }} def task_finish_alert(context): print('############### task begin callback!###################') url = 'http://192.168.199.109:18082/jpt/af_run/notification' ti = context['ti'] requests.post(url, json={"data": { "job_id": job_id, "dag_id": ti.dag_id, "task_id": ti.task.task_id, "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'), "start_time": ti.start_date.timestamp(), "end_time": ti.end_date.timestamp(), "status": ti.current_state() }}) def dag_begin_alert(context): print('############### dag begin callback!###################') url = 'http://192.168.199.109:18082/jpt/af_run' ti = context['ti'] requests.post(url, json={"data": { "job_id": job_id, "dag_id": ti.dag_id, "task_id": ti.task.task_id, "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'), "start_time": ti.start_date.timestamp(), "end_time": ti.end_date.timestamp(), "status": ti.current_state() }}) print('enter dag run!') namespace = conf.get("kubernetes", "NAMESPACE") # set the name that will be printed name = "dag_user{{ user_name }}" # instantiate the DAG with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',dag_id="{{ dag_id }}",is_paused_upon_creation= not {{ trigger_status }}) as dag: op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert) {% for spark_node in spark_nodes %} with TaskGroup("{{ spark_node['name'] }}", prefix_group_id=False, tooltip="{{ spark_node['desc'] }}") as op_{{ spark_node['id'] }}: {% for spark_sub_node in spark_node['sub_nodes'] %} op_{{ spark_sub_node['id'] }} = KubernetesPodOperator( task_id="{{ spark_sub_node["id"] }}", image="{{ spark_sub_node['image'] }}", in_cluster=True, namespace=namespace, name="{{ spark_sub_node['name'] }}", random_name_suffix=True, labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} }, reattach_on_restart=True, is_delete_operator_pod=False, get_logs=True, log_events_on_failure=True, cmds={{ spark_sub_node['cmds'] }}, env_vars={{ spark_sub_node['env'] }}, on_success_callback=task_finish_alert, on_failure_callback=task_finish_alert ) {% endfor %} {% for edge in spark_node['edges'] %} op_{{edge[0] }} >> op_{{edge[1] }} {% endfor %} {% endfor %} {% for node in nodes %} {{ node['operator_name'] }} = KubernetesPodOperator( task_id="{{ node["id"] }}", image="{{ node['image'] }}", in_cluster=True, namespace=namespace, name="{{ node['name'] }}", random_name_suffix=True, labels={'app':'backend', 'env':'dev', 'run_ts':{{ "'{{ ts_nodash }}'" }} }, reattach_on_restart=True, is_delete_operator_pod=False, get_logs=True, log_events_on_failure=True, cmds={{ node['cmds'] }}, env_vars={{ node['env'] }}, on_success_callback=task_finish_alert, on_failure_callback=task_finish_alert {# on_failure_callback=task_finish_alert#} ) op_start >> {{ node['operator_name'] }} {% endfor %} {% for edge in edges%} {{ edge['source_operator_name'] }} >> {{ edge['target_operator_name'] }} {% endfor %}