12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- import airflow
- from airflow import DAG
- from datetime import datetime
- from airflow.operators.empty import EmptyOperator
- from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
- from airflow.configuration import conf
- import requests
- namespace = conf.get("kubernetes", "NAMESPACE")
- # set the name that will be printed
- name = "luoyulong"
- job_id = 10000
- run_id = 0
- def send_sigal(msg):
- url = 'http://192.168.199.109:18082/jpt/jpt_run/sigal'
- requests.post(url, json={"data": {'msg': f'{msg} run id ={run_id}'}})
- def task_finish(context):
- url = 'http://192.168.199.109:18082/jpt/jpt_run/sigal'
- ti = context['ti']
- run_ts = ti.execution_date.strftime('%Y%m%dT%H%M%S')
- global run_id
- run_id = run_ts
- requests.post(url, json={"data":
- {'msg': f' run id ={run_id}',
- 'task_id': ti.task.task_id,
- 'status': ti.current_state()}
- })
- send_sigal('begin ')
- # instantiate the DAG
- with DAG(
- start_date=datetime(2022, 6, 1),
- catchup=False,
- schedule_interval='@daily',
- dag_id="single_empty_task",
- on_success_callback=task_finish,
- on_failure_callback=task_finish
- ) as dag:
- op_test = EmptyOperator(task_id='xxx')
- op_20 = KubernetesPodOperator(
- task_id="task_20_by_33",
- image="SXKJ:32775/pod_python:1.1",
- in_cluster=True,
- namespace=namespace,
- name="python_task_1664728953",
- random_name_suffix=True,
- labels={'app': 'backend', 'env': 'dev', 'run_ts': "{{ts_nodash }}"},
- reattach_on_restart=True,
- is_delete_operator_pod=False,
- get_logs=True,
- log_events_on_failure=True,
- cmds=['/bin/bash', '-c', 'echo 1 '],
- )
- op_test >> op_20
- send_sigal('end')
|