manual_dummy_task.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import airflow
  2. from airflow import DAG
  3. from datetime import datetime
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
  6. from airflow.configuration import conf
  7. import requests
  8. namespace = conf.get("kubernetes", "NAMESPACE")
  9. # set the name that will be printed
  10. name = "luoyulong"
  11. job_id = 10000
  12. run_id = 0
  13. def send_sigal(msg):
  14. url = 'http://192.168.199.109:18082/jpt/jpt_run/sigal'
  15. requests.post(url, json={"data": {'msg': f'{msg} run id ={run_id}'}})
  16. def task_finish(context):
  17. url = 'http://192.168.199.109:18082/jpt/jpt_run/sigal'
  18. ti = context['ti']
  19. run_ts = ti.execution_date.strftime('%Y%m%dT%H%M%S')
  20. global run_id
  21. run_id = run_ts
  22. requests.post(url, json={"data":
  23. {'msg': f' run id ={run_id}',
  24. 'task_id': ti.task.task_id,
  25. 'status': ti.current_state()}
  26. })
  27. send_sigal('begin ')
  28. # instantiate the DAG
  29. with DAG(
  30. start_date=datetime(2022, 6, 1),
  31. catchup=False,
  32. schedule_interval='@daily',
  33. dag_id="single_empty_task",
  34. on_success_callback=task_finish,
  35. on_failure_callback=task_finish
  36. ) as dag:
  37. op_test = EmptyOperator(task_id='xxx')
  38. op_20 = KubernetesPodOperator(
  39. task_id="task_20_by_33",
  40. image="SXKJ:32775/pod_python:1.1",
  41. in_cluster=True,
  42. namespace=namespace,
  43. name="python_task_1664728953",
  44. random_name_suffix=True,
  45. labels={'app': 'backend', 'env': 'dev', 'run_ts': "{{ts_nodash }}"},
  46. reattach_on_restart=True,
  47. is_delete_operator_pod=False,
  48. get_logs=True,
  49. log_events_on_failure=True,
  50. cmds=['/bin/bash', '-c', 'echo 1 '],
  51. )
  52. op_test >> op_20
  53. send_sigal('end')