run.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import datetime
  2. import json
  3. import time
  4. from collections import defaultdict
  5. import requests
  6. from fastapi import APIRouter, Depends
  7. from fastapi_pagination import paginate, Params
  8. from pydantic import BaseModel
  9. from sqlalchemy.orm import Session
  10. from app import schemas, get_db, crud
  11. from app.core.airflow.af_util import get_airflow_api_info, call_airflow_api, datetime2timestamp
  12. from utils import web_try, sxtimeit
  13. class Item(BaseModel):
  14. data: dict
  15. router_af_run = APIRouter(
  16. prefix="/af/af_run",
  17. tags=["airflow_run-运行管理"],
  18. )
  19. @router_af_run.get("/")
  20. @web_try()
  21. @sxtimeit
  22. def get_runs(params: Params = Depends(), db: Session = Depends(get_db)):
  23. return paginate(crud.get_airflow_tasks(db), params)
  24. @router_af_run.get("/{run_id}/status")
  25. @web_try()
  26. @sxtimeit
  27. def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
  28. item = crud.get_airflow_run_once(item_id=run_id, db=db)
  29. job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id)
  30. print(f'job_item.job_mode is {job_item.job_mode},with af run id= {item.af_run_id}')
  31. if job_item.job_mode == 1: # 常规模式
  32. if item.status in [2, 3]:
  33. return {"status": item.status}
  34. else:
  35. uri_prefix, headers = get_airflow_api_info()
  36. url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
  37. state = requests.get(url, headers=headers).json().get('state', None)
  38. status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
  39. print(f'status is {status}, with state {state}')
  40. if item.status != status: # queue or running
  41. item.status = status
  42. item.update(db)
  43. print(f'after update {item.status}')
  44. return {"status": status}
  45. else:
  46. uri_prefix, headers = get_airflow_api_info()
  47. url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
  48. state = requests.get(url, headers=headers).json().get('state', None)
  49. status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
  50. print(f'status is {status}, with state {state}')
  51. return {"status": status}
  52. @router_af_run.post("/")
  53. @web_try()
  54. @sxtimeit
  55. def add_airflow_run(item: Item, db: Session = Depends(get_db)):
  56. print(item.data)
  57. job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
  58. sparks_dependence = {}
  59. debug_run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  60. if job_item.job_mode == 1 or (job_item.job_mode == 2 and debug_run is None): # 常规模式
  61. for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
  62. if task.task_type == 'sparks':
  63. sparks = json.loads(task.script)
  64. dependence = []
  65. for (source, sink) in sparks['edges']:
  66. dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
  67. sparks_dependence[task.id] = dependence
  68. item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"],
  69. "job_id": int(item.data["job_id"]),
  70. "run_ts": item.data['run_ts'],
  71. "af_run_id": item.data['af_run_id'],
  72. "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
  73. "sparks": sparks_dependence}},
  74. "status": 0},
  75. )
  76. crud.create_airflow_run(db, item)
  77. elif job_item.job_mode == 2: # 调试模式
  78. run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  79. sparks_task = schemas.AirflowTask(**job_item.tasks[0])
  80. assert sparks_task.task_type == 'sparks'
  81. sparks = json.loads(sparks_task.script)
  82. sparks_dependence[sparks_task.id] = [[f'{sparks_task.id}_{s}', f'{sparks_task.id}_{k}'] for (s, k) in
  83. sparks['edges']]
  84. spark_nodes = [sub_node['id'] for sub_node in sparks['sub_nodes']]
  85. run.details['dependence']['sparks'] = sparks_dependence
  86. run.details['tasks'] = {k: v for k, v in run.details['tasks'].items() if k in spark_nodes}
  87. update_run = schemas.AirflowRunUpdate(
  88. **{"details": run.details, "status": 1, "af_run_id": item.data['af_run_id']})
  89. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  90. @router_af_run.post("/notification")
  91. @web_try()
  92. @sxtimeit
  93. def add_notification(item: Item, db: Session = Depends(get_db)):
  94. state_uri = f"dags/{item.data['dag_id']}/dagRuns/{item.data['af_run_id']}/taskInstances/{item.data['task_id']}"
  95. log_uri = f"{state_uri}/logs/1"
  96. try_count = 10
  97. while try_count > 0:
  98. state = call_airflow_api("get", state_uri, {}).json().get('state', None)
  99. if state in ['success', 'failed']:
  100. break
  101. time.sleep(1)
  102. try_count -= 1
  103. logs = call_airflow_api("get", log_uri, {}).text.encode('raw_unicode_escape').decode('utf-8')
  104. job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
  105. if job_item.job_mode == 1: # normal model, one job-> many runs
  106. run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'],job_id=job_item.id, db=db)
  107. elif job_item.job_mode == 2: # debug model, one job-> one run
  108. run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  109. else:
  110. run = None
  111. if run is not None:
  112. update_run = schemas.AirflowRunUpdate(
  113. **{"details": run.details, "status": run.status, "af_run_id": item.data['af_run_id']})
  114. update_run.details['tasks'][item.data['task_id']].update({"log": logs,
  115. "start_time": item.data["start_time"],
  116. "end_time": item.data["end_time"],
  117. "status": item.data['status']})
  118. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  119. @router_af_run.post("/sigal")
  120. @web_try()
  121. @sxtimeit
  122. def add_notification(item: Item):
  123. print(f'receive sigal: {item.data} ')
  124. @router_af_run.get("/tasks_status/{job_id}/{af_run_id}")
  125. @web_try()
  126. @sxtimeit
  127. def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db)):
  128. ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances', args_dict={})
  129. details = defaultdict(dict)
  130. for task in ret.json()['task_instances']:
  131. details['tasks'][task['task_id']] = {
  132. # "log": logs,
  133. "start_time": datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
  134. "end_time": datetime.datetime.strptime(task['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(),
  135. "status": task['state']
  136. }
  137. # print(f"{task['task_id']}:{task['duration']}")
  138. return details
  139. @router_af_run.get("/running_status/{job_id}/{af_run_id}")
  140. @web_try()
  141. @sxtimeit
  142. def get_airflow_dagrun_running_status(job_id: int, af_run_id: str):
  143. job_uri = f'dags/dag_{job_id}/dagRuns/{af_run_id}'
  144. job_ret = call_airflow_api(method='get', uri=job_uri, args_dict={})
  145. if job_ret.status_code != 200:
  146. raise Exception(f'cant found the information of this job run,please check your input: job uri is {job_uri} ')
  147. return {
  148. "start_time": datetime2timestamp(job_ret.json()['start_date']),
  149. "end_time": datetime2timestamp(job_ret.json()['end_date']),
  150. "status": job_ret.json()['state']
  151. }
  152. @router_af_run.get("/task_log/{job_id}/{af_run_id}/{task_id}")
  153. @web_try()
  154. @sxtimeit
  155. def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
  156. state_uri = f"dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances/{task_id}"
  157. log_uri = f"{state_uri}/logs/1"
  158. job_item = crud.get_airflow_job_once(db=db, item_id=job_id)
  159. print(f'job_mode type is {job_item.job_mode}, af_run_id in')
  160. if job_item.job_mode == 1: # normal model, one job-> many runs
  161. run = crud.get_airflow_run_once_normal_mode(af_run_id=af_run_id, job_id=job_id,db=db)
  162. elif job_item.job_mode == 2: # debug model, one job-> one run
  163. run = crud.get_airflow_run_once_debug_mode(job_id=job_id, db=db)
  164. else:
  165. run = None
  166. if run is not None:
  167. if run.details['tasks'].get(task_id, {}).get("status", "running") not in ["success", "failed"]:
  168. state_ret = call_airflow_api(method='get', uri=state_uri, args_dict={})
  169. log_ret = call_airflow_api(method='get', uri=log_uri, args_dict={})
  170. if state_ret.status_code != 200 or log_ret.status_code != 200:
  171. return None
  172. update_run = schemas.AirflowRunUpdate(
  173. **{"details": run.details, "status": run.status, "af_run_id": af_run_id})
  174. task_info = {
  175. "log": log_ret.text.encode('raw_unicode_escape').decode('utf-8'),
  176. "status": state_ret.json()['state'],
  177. "execution_time": datetime2timestamp(state_ret.json()['execution_date']),
  178. "start_time": datetime2timestamp(state_ret.json()['start_date']),
  179. "end_time": datetime2timestamp(state_ret.json()['end_date']),
  180. }
  181. update_run.details['tasks'][task_id] = task_info
  182. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  183. return task_info
  184. else:
  185. return run.details['tasks'][task_id]
  186. @router_af_run.get("/data_transfer_log/{af_run_id}")
  187. @web_try()
  188. @sxtimeit
  189. def get_airflow_dagrun_task_log(af_run_id: str):
  190. state_uri = f"dags/dag_0/dagRuns/{af_run_id}/taskInstances/0"
  191. log_uri = f"{state_uri}/logs/1"
  192. state_ret = call_airflow_api(method='get', uri=state_uri, args_dict={})
  193. log_ret = call_airflow_api(method='get', uri=log_uri, args_dict={})
  194. return {
  195. "log": log_ret.text,
  196. "status": state_ret.json()['state'],
  197. "execution_time": datetime2timestamp(state_ret.json()['execution_date']),
  198. "start_time": datetime2timestamp(state_ret.json()['start_date']),
  199. "end_time": datetime2timestamp(state_ret.json()['end_date']),
  200. }