|
@@ -2,41 +2,60 @@ import airflow
|
|
|
from airflow import DAG
|
|
|
from datetime import datetime
|
|
|
from airflow.operators.empty import EmptyOperator
|
|
|
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
|
|
|
from airflow.configuration import conf
|
|
|
import requests
|
|
|
-from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
|
|
|
|
|
|
namespace = conf.get("kubernetes", "NAMESPACE")
|
|
|
|
|
|
# set the name that will be printed
|
|
|
name = "luoyulong"
|
|
|
+job_id = 10000
|
|
|
+run_id = 0
|
|
|
|
|
|
|
|
|
-def task_success_alert(context: airflow.utils.context.Context):
|
|
|
- url = 'http://192.168.199.109:18082/jpt/af_run/notification'
|
|
|
- print('############### enter success callback!')
|
|
|
- # task_instance_key_str = context['task_instance_key_str']
|
|
|
- # infos = task_instance_key_str.split('__')
|
|
|
- print(type(context))
|
|
|
- print(context.keys())
|
|
|
- print(context)
|
|
|
- # requests.post(url, data={"data": {
|
|
|
- requests.post(url, json={"data": {'run_id': 123}})
|
|
|
+def send_sigal(msg):
|
|
|
+ url = 'http://192.168.199.109:18082/jpt/af_run/sigal'
|
|
|
+ requests.post(url, json={"data": {'msg': f'{msg} run id ={run_id}'}})
|
|
|
|
|
|
|
|
|
-def task_begin_alert(context):
|
|
|
- # pass
|
|
|
- print('############### enter begin callback!')
|
|
|
- url = 'http://192.168.199.109:18082/jpt/af_run/notification'
|
|
|
- requests.post(url, json={"data": {'run_id': "begin to task exec "}})
|
|
|
+def task_finish(context):
|
|
|
+ url = 'http://192.168.199.109:18082/jpt/af_run/sigal'
|
|
|
+ ti = context['ti']
|
|
|
+ run_ts = ti.execution_date.strftime('%Y%m%dT%H%M%S')
|
|
|
+ global run_id
|
|
|
+ run_id = run_ts
|
|
|
+ requests.post(url, json={"data":
|
|
|
+ {'msg': f' run id ={run_id}',
|
|
|
+ 'task_id': ti.task.task_id,
|
|
|
+ 'status': ti.current_state()}
|
|
|
+ })
|
|
|
|
|
|
|
|
|
+send_sigal('begin ')
|
|
|
# instantiate the DAG
|
|
|
with DAG(
|
|
|
start_date=datetime(2022, 6, 1),
|
|
|
catchup=False,
|
|
|
schedule_interval='@daily',
|
|
|
- dag_id="single_empty_task"
|
|
|
+ dag_id="single_empty_task",
|
|
|
+ on_success_callback=task_finish,
|
|
|
+ on_failure_callback=task_finish
|
|
|
) as dag:
|
|
|
- op_test = EmptyOperator(task_id='empty_op', on_success_callback=task_success_alert,
|
|
|
- pre_execute=task_begin_alert)
|
|
|
+ op_test = EmptyOperator(task_id='xxx')
|
|
|
+ op_20 = KubernetesPodOperator(
|
|
|
+ task_id="task_20_by_33",
|
|
|
+ image="SXKJ:32775/pod_python:1.1",
|
|
|
+ in_cluster=True,
|
|
|
+ namespace=namespace,
|
|
|
+ name="python_task_1664728953",
|
|
|
+ random_name_suffix=True,
|
|
|
+ labels={'app': 'backend', 'env': 'dev', 'run_ts': "{{ts_nodash }}"},
|
|
|
+ reattach_on_restart=True,
|
|
|
+ is_delete_operator_pod=False,
|
|
|
+ get_logs=True,
|
|
|
+ log_events_on_failure=True,
|
|
|
+ cmds=['/bin/bash', '-c', 'echo 1 '],
|
|
|
+ )
|
|
|
+ op_test >> op_20
|
|
|
+send_sigal('end')
|