run.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import datetime
  2. import json
  3. import time
  4. from collections import defaultdict
  5. import requests
  6. from fastapi import APIRouter, Depends
  7. from fastapi_pagination import paginate, Params
  8. from pydantic import BaseModel
  9. from sqlalchemy.orm import Session
  10. from app import schemas, get_db, crud
  11. from app.core.airflow.af_util import get_airflow_api_info, call_airflow_api, datetime2timestamp
  12. from app.core.k8s.k8s_client import KubernetesTools
  13. from utils import web_try, sxtimeit
  14. class Item(BaseModel):
  15. data: dict
  16. router_af_run = APIRouter(
  17. prefix="/af/af_run",
  18. tags=["airflow_run-运行管理"],
  19. )
  20. @router_af_run.get("/")
  21. @web_try()
  22. @sxtimeit
  23. def get_runs(params: Params = Depends(), db: Session = Depends(get_db)):
  24. return paginate(crud.get_airflow_tasks(db), params)
  25. @router_af_run.get("/{run_id}/status")
  26. @web_try()
  27. @sxtimeit
  28. def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
  29. item = crud.get_airflow_run_once(item_id=run_id, db=db)
  30. job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id)
  31. print(f'job_item.job_mode is {job_item.job_mode},with af run id= {item.af_run_id}')
  32. if job_item.job_mode == 1: # 常规模式
  33. if item.status in [2, 3]:
  34. return {"status": item.status}
  35. else:
  36. uri_prefix, headers = get_airflow_api_info()
  37. url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
  38. state = requests.get(url, headers=headers).json().get('state', None)
  39. status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
  40. print(f'status is {status}, with state {state}')
  41. if item.status != status: # queue or running
  42. item.status = status
  43. item.update(db)
  44. print(f'after update {item.status}')
  45. return {"status": status}
  46. else:
  47. uri_prefix, headers = get_airflow_api_info()
  48. url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
  49. state = requests.get(url, headers=headers).json().get('state', None)
  50. status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
  51. print(f'status is {status}, with state {state}')
  52. return {"status": status}
  53. @router_af_run.post("/")
  54. @web_try()
  55. @sxtimeit
  56. def add_airflow_run(item: Item, db: Session = Depends(get_db)):
  57. print(item.data)
  58. job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
  59. sparks_dependence = {}
  60. debug_run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  61. if job_item.job_mode == 1 or (job_item.job_mode == 2 and debug_run is None): # 常规模式
  62. for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
  63. if task.task_type == 'sparks':
  64. sparks = json.loads(task.script)
  65. dependence = []
  66. for (source, sink) in sparks['edges']:
  67. dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
  68. sparks_dependence[task.id] = dependence
  69. item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"],
  70. "job_id": int(item.data["job_id"]),
  71. "run_ts": item.data['run_ts'],
  72. "af_run_id": item.data['af_run_id'],
  73. "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
  74. "sparks": sparks_dependence}},
  75. "status": 0},
  76. )
  77. crud.create_airflow_run(db, item)
  78. elif job_item.job_mode == 2: # 调试模式
  79. run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  80. sparks_task = schemas.AirflowTask(**job_item.tasks[0])
  81. assert sparks_task.task_type == 'sparks'
  82. sparks = json.loads(sparks_task.script)
  83. sparks_dependence[sparks_task.id] = [[f'{sparks_task.id}_{s}', f'{sparks_task.id}_{k}'] for (s, k) in
  84. sparks['edges']]
  85. spark_nodes = [sub_node['id'] for sub_node in sparks['sub_nodes']]
  86. run.details['dependence']['sparks'] = sparks_dependence
  87. run.details['tasks'] = {k: v for k, v in run.details['tasks'].items() if k in spark_nodes}
  88. update_run = schemas.AirflowRunUpdate(
  89. **{"details": run.details, "status": 1, "af_run_id": item.data['af_run_id']})
  90. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  91. @router_af_run.post("/notification")
  92. @web_try()
  93. @sxtimeit
  94. def add_notification(item: Item, db: Session = Depends(get_db)):
  95. k8s_tool = KubernetesTools()
  96. labels = {"dag_id": item.data['dag_id'], "task_id": item.data['task_id'], "run_ts": item.data['run_ts']}
  97. logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
  98. job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
  99. if job_item.job_mode == 1: # normal model, one job-> many runs
  100. run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
  101. elif job_item.job_mode == 2: # debug model, one job-> one run
  102. run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  103. else:
  104. run = None
  105. if run is not None:
  106. update_run = schemas.AirflowRunUpdate(
  107. **{"details": run.details, "status": run.status, "af_run_id": item.data['af_run_id']})
  108. update_run.details['tasks'][item.data['task_id']] = {"log": logs,
  109. "start_time": item.data["start_time"],
  110. "end_time": item.data["end_time"],
  111. "status": item.data['status']}
  112. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  113. @router_af_run.post("/sigal")
  114. @web_try()
  115. @sxtimeit
  116. def add_notification(item: Item):
  117. print(f'receive sigal: {item.data} ')
  118. @router_af_run.get("/tasks_status/{job_id}/{af_run_id}")
  119. @web_try()
  120. @sxtimeit
  121. def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
  122. ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances', args_dict={})
  123. details = defaultdict(dict)
  124. for task in ret.json()['task_instances']:
  125. details['tasks'][task['task_id']] = {
  126. # "log": logs,
  127. "start_time": datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
  128. "end_time": datetime.datetime.strptime(task['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
  129. "status": task['state']
  130. }
  131. # print(f"{task['task_id']}:{task['duration']}")
  132. return details
  133. @router_af_run.get("/running_status/{job_id}/{af_run_id}")
  134. @web_try()
  135. @sxtimeit
  136. def get_airflow_dagrun_running_status(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
  137. job_info = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}', args_dict={})
  138. tasks_info = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances', args_dict={})
  139. details = defaultdict(dict)
  140. for task in tasks_info.json()['task_instances']:
  141. details['tasks'][task['task_id']] = {
  142. # "log": logs,
  143. "start_time": datetime2timestamp(task['start_date']),
  144. "end_time": datetime2timestamp(task['end_date']),
  145. "status": task['state']
  146. }
  147. # print(f"{task['task_id']}:{task['duration']}")
  148. # item = schemas.AirflowRunUpdate(**{#"start_time": item.data["start_time"],
  149. # #"job_id": int(job_id),
  150. # # "run_ts": item.data['run_ts'],
  151. # # "af_run_id": item.data['af_run_id'],
  152. # "end_time":datetime2timestamp()
  153. # "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
  154. # "sparks": sparks_dependence}},
  155. # "status": 0},
  156. # item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"],
  157. # "job_id": int(job_id),
  158. # # "run_ts": item.data['run_ts'],
  159. # # "af_run_id": item.data['af_run_id'],
  160. # "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
  161. # "sparks": sparks_dependence}},
  162. # "status": 0},
  163. # return ret.json()
  164. @router_af_run.get("/task_log/{job_id}/{af_run_id}/{task_id}")
  165. @web_try()
  166. @sxtimeit
  167. def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str):
  168. ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances/{task_id}/logs/1',
  169. args_dict={})
  170. return {"log": ret.text}