run.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import json
  2. import requests
  3. from fastapi import APIRouter, Depends
  4. from fastapi_pagination import paginate, Params
  5. from pydantic import BaseModel
  6. from sqlalchemy.orm import Session
  7. from app import schemas, get_db, crud
  8. from app.core.airflow.uri import get_airflow_api_info
  9. from app.core.k8s.k8s_client import KubernetesTools
  10. from utils import web_try, sxtimeit
  11. class Item(BaseModel):
  12. data: dict
  13. router_af_run = APIRouter(
  14. prefix="/jpt/af_run",
  15. tags=["airflow_run-运行管理"],
  16. )
  17. @router_af_run.get("/")
  18. @web_try()
  19. @sxtimeit
  20. def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
  21. return paginate(crud.get_airflow_tasks(db), params)
  22. @router_af_run.get("/{run_id}/status")
  23. @web_try()
  24. @sxtimeit
  25. def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
  26. item = crud.get_airflow_run_once(item_id=run_id, db=db)
  27. job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id)
  28. if job_item.job_mode == 1: # 常规模式
  29. if item.status in [2, 3]:
  30. return {"status": item.status}
  31. else:
  32. uri_prefix, headers = get_airflow_api_info()
  33. url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
  34. state = requests.get(url, headers=headers).json().get('state', None)
  35. status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
  36. print(f'status is {status}, with state {state}')
  37. if item.status != status: # queue or running
  38. item.status = status
  39. item.update(db)
  40. print(f'after update {item.status}')
  41. return {"status": status}
  42. else:
  43. uri_prefix, headers = get_airflow_api_info()
  44. url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
  45. state = requests.get(url, headers=headers).json().get('state', None)
  46. status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
  47. print(f'status is {status}, with state {state}')
  48. return {"status": status}
  49. @router_af_run.post("/")
  50. @web_try()
  51. @sxtimeit
  52. def add_airflow_run(item: Item, db: Session = Depends(get_db)):
  53. print(item.data)
  54. job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
  55. sparks_dependence = {}
  56. debug_run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  57. if job_item.job_mode == 1 or (job_item.job_mode == 2 and debug_run is None): # 常规模式
  58. for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
  59. if task.task_type == 'sparks':
  60. sparks = json.loads(task.script)
  61. dependence = []
  62. for (source, sink) in sparks['edges']:
  63. dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
  64. sparks_dependence[task.id] = dependence
  65. item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"],
  66. "job_id": int(item.data["job_id"]),
  67. "run_ts": item.data['run_ts'],
  68. "af_run_id": item.data['af_run_id'],
  69. "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
  70. "sparks": sparks_dependence}},
  71. "status": 0},
  72. )
  73. crud.create_airflow_run(db, item)
  74. elif job_item.job_mode == 2: # 调试模式
  75. run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  76. sparks_task = schemas.AirflowTask(**job_item.tasks[0])
  77. assert sparks_task.task_type == 'sparks'
  78. sparks = json.loads(sparks_task.script)
  79. sparks_dependence[sparks_task.id] = [[f'{sparks_task.id}_{s}', f'{sparks_task.id}_{k}'] for (s, k) in
  80. sparks['edges']]
  81. spark_nodes = [sub_node['id'] for sub_node in sparks['sub_nodes']]
  82. run.details['dependence']['sparks'] = sparks_dependence
  83. run.details['tasks'] = {k: v for k, v in run.details['tasks'].items() if k in spark_nodes}
  84. update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": 1})
  85. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  86. @router_af_run.post("/notification")
  87. @web_try()
  88. @sxtimeit
  89. def add_notification(item: Item, db: Session = Depends(get_db)):
  90. k8s_tool = KubernetesTools()
  91. labels = {"dag_id": item.data['dag_id'], "task_id": item.data['task_id'], "run_ts": item.data['run_ts']}
  92. logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
  93. job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
  94. if job_item.job_mode == 1: # normal model, one job-> many runs
  95. run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
  96. elif job_item.job_mode == 2: # debug model, one job-> one run
  97. run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  98. else:
  99. run = None
  100. if run is not None:
  101. update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
  102. update_run.details['tasks'][item.data['task_id']] = {"log": logs,
  103. "start_time": item.data["start_time"],
  104. "end_time": item.data["end_time"],
  105. "status": item.data['status']}
  106. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  107. @router_af_run.post("/sigal")
  108. @web_try()
  109. @sxtimeit
  110. def add_notification(item: Item):
  111. print(f'receive sigal: {item.data} ')