import airflow from airflow import DAG from datetime import datetime from airflow.operators.empty import EmptyOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.configuration import conf import requests namespace = conf.get("kubernetes", "NAMESPACE") # set the name that will be printed name = "luoyulong" job_id = 10000 run_id = 0 def send_sigal(msg): url = 'http://192.168.199.109:18082/jpt/jpt_run/sigal' requests.post(url, json={"data": {'msg': f'{msg} run id ={run_id}'}}) def task_finish(context): url = 'http://192.168.199.109:18082/jpt/jpt_run/sigal' ti = context['ti'] run_ts = ti.execution_date.strftime('%Y%m%dT%H%M%S') global run_id run_id = run_ts requests.post(url, json={"data": {'msg': f' run id ={run_id}', 'task_id': ti.task.task_id, 'status': ti.current_state()} }) send_sigal('begin ') # instantiate the DAG with DAG( start_date=datetime(2022, 6, 1), catchup=False, schedule_interval='@daily', dag_id="single_empty_task", on_success_callback=task_finish, on_failure_callback=task_finish ) as dag: op_test = EmptyOperator(task_id='xxx') op_20 = KubernetesPodOperator( task_id="task_20_by_33", image="SXKJ:32775/pod_python:1.1", in_cluster=True, namespace=namespace, name="python_task_1664728953", random_name_suffix=True, labels={'app': 'backend', 'env': 'dev', 'run_ts': "{{ts_nodash }}"}, reattach_on_restart=True, is_delete_operator_pod=False, get_logs=True, log_events_on_failure=True, cmds=['/bin/bash', '-c', 'echo 1 '], ) op_test >> op_20 send_sigal('end')