run.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import json
  2. import requests
  3. from fastapi import APIRouter, Depends
  4. from fastapi_pagination import paginate, Params
  5. from pydantic import BaseModel
  6. from sqlalchemy.orm import Session
  7. from app import schemas, get_db, crud
  8. from app.core.airflow.uri import get_airflow_api_info
  9. from app.core.k8s.k8s_client import KubernetesTools
  10. from utils import web_try, sxtimeit
  11. class Item(BaseModel):
  12. data: dict
  13. router_af_run = APIRouter(
  14. prefix="/jpt/af_run",
  15. tags=["airflow_run-运行管理"],
  16. )
  17. @router_af_run.get("/")
  18. @web_try()
  19. @sxtimeit
  20. def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
  21. return paginate(crud.get_airflow_tasks(db), params)
  22. @router_af_run.get("/{run_id}/status")
  23. @web_try()
  24. @sxtimeit
  25. def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
  26. item = crud.get_airflow_run_once(item_id=run_id, db=db)
  27. job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id)
  28. print(f'job_item.job_mode is {job_item.job_mode},with af run id= {item.af_run_id}')
  29. if job_item.job_mode == 1: # 常规模式
  30. if item.status in [2, 3]:
  31. return {"status": item.status}
  32. else:
  33. uri_prefix, headers = get_airflow_api_info()
  34. url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
  35. state = requests.get(url, headers=headers).json().get('state', None)
  36. status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
  37. print(f'status is {status}, with state {state}')
  38. if item.status != status: # queue or running
  39. item.status = status
  40. item.update(db)
  41. print(f'after update {item.status}')
  42. return {"status": status}
  43. else:
  44. uri_prefix, headers = get_airflow_api_info()
  45. url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
  46. state = requests.get(url, headers=headers).json().get('state', None)
  47. status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
  48. print(f'status is {status}, with state {state}')
  49. return {"status": status}
  50. @router_af_run.post("/")
  51. @web_try()
  52. @sxtimeit
  53. def add_airflow_run(item: Item, db: Session = Depends(get_db)):
  54. print(item.data)
  55. job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
  56. sparks_dependence = {}
  57. debug_run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  58. if job_item.job_mode == 1 or (job_item.job_mode == 2 and debug_run is None): # 常规模式
  59. for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
  60. if task.task_type == 'sparks':
  61. sparks = json.loads(task.script)
  62. dependence = []
  63. for (source, sink) in sparks['edges']:
  64. dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
  65. sparks_dependence[task.id] = dependence
  66. item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"],
  67. "job_id": int(item.data["job_id"]),
  68. "run_ts": item.data['run_ts'],
  69. "af_run_id": item.data['af_run_id'],
  70. "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
  71. "sparks": sparks_dependence}},
  72. "status": 0},
  73. )
  74. crud.create_airflow_run(db, item)
  75. elif job_item.job_mode == 2: # 调试模式
  76. run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  77. sparks_task = schemas.AirflowTask(**job_item.tasks[0])
  78. assert sparks_task.task_type == 'sparks'
  79. sparks = json.loads(sparks_task.script)
  80. sparks_dependence[sparks_task.id] = [[f'{sparks_task.id}_{s}', f'{sparks_task.id}_{k}'] for (s, k) in
  81. sparks['edges']]
  82. spark_nodes = [sub_node['id'] for sub_node in sparks['sub_nodes']]
  83. run.details['dependence']['sparks'] = sparks_dependence
  84. run.details['tasks'] = {k: v for k, v in run.details['tasks'].items() if k in spark_nodes}
  85. update_run = schemas.AirflowRunUpdate(
  86. **{"details": run.details, "status": 1, "af_run_id": item.data['af_run_id']})
  87. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  88. @router_af_run.post("/notification")
  89. @web_try()
  90. @sxtimeit
  91. def add_notification(item: Item, db: Session = Depends(get_db)):
  92. k8s_tool = KubernetesTools()
  93. labels = {"dag_id": item.data['dag_id'], "task_id": item.data['task_id'], "run_ts": item.data['run_ts']}
  94. logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
  95. job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
  96. if job_item.job_mode == 1: # normal model, one job-> many runs
  97. run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
  98. elif job_item.job_mode == 2: # debug model, one job-> one run
  99. run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
  100. else:
  101. run = None
  102. if run is not None:
  103. update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status,"af_run_id":item.data['af_run_id']})
  104. update_run.details['tasks'][item.data['task_id']] = {"log": logs,
  105. "start_time": item.data["start_time"],
  106. "end_time": item.data["end_time"],
  107. "status": item.data['status']}
  108. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  109. @router_af_run.post("/sigal")
  110. @web_try()
  111. @sxtimeit
  112. def add_notification(item: Item):
  113. print(f'receive sigal: {item.data} ')