job.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import datetime
  2. import json
  3. import os
  4. from typing import Optional
  5. import requests
  6. from fastapi import APIRouter, Depends
  7. from fastapi_pagination import paginate, Params
  8. from sqlalchemy.orm import Session
  9. from app import schemas, get_db, crud
  10. from app.core.airflow.af_util import get_job_path, get_airflow_api_info, call_airflow_api
  11. from app.crud import create_airflow_job_submit
  12. from app.schemas import AirflowTrigger
  13. from utils import web_try, sxtimeit
  14. router_af_job = APIRouter(
  15. prefix="/af/af_job",
  16. tags=["airflow_job-任务管理"],
  17. )
  18. @router_af_job.get("/")
  19. @web_try()
  20. @sxtimeit
  21. def get_af_jobs(params: Params = Depends(), db: Session = Depends(get_db)):
  22. return paginate(crud.get_airflow_jobs(db), params)
  23. @router_af_job.get("/{item_id}/last_parsed_time")
  24. @web_try()
  25. @sxtimeit
  26. def get_af_jobs(item_id: int):
  27. uri_prefix, headers = get_airflow_api_info()
  28. url = f'{uri_prefix}/dags/dag_{item_id}'
  29. rets = requests.get(url, headers=headers).json()
  30. if 'last_parsed_time' in rets:
  31. last_parsed_time_str = rets['last_parsed_time']
  32. last_parsed_time = datetime.datetime.strptime(last_parsed_time_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
  33. else:
  34. last_parsed_time = None
  35. return {"last_parsed_time": last_parsed_time}
  36. @router_af_job.get("/getOnce/{item_id}")
  37. @web_try()
  38. @sxtimeit
  39. def get_af_jobs_once(item_id: int, db: Session = Depends(get_db)):
  40. return crud.get_airflow_job_once(db, item_id)
  41. @router_af_job.post("/")
  42. @web_try()
  43. @sxtimeit
  44. def add_af_job(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
  45. return crud.create_airflow_job(db, item)
  46. @router_af_job.put("/{item_id}")
  47. @web_try()
  48. @sxtimeit
  49. def update_af_job(item_id: int, update_item: schemas.AirflowJobUpdate, db: Session = Depends(get_db)):
  50. return crud.update_airflow_job(db=db, item_id=item_id, update_item=update_item)
  51. @router_af_job.post("/submit")
  52. @web_try()
  53. @sxtimeit
  54. def add_dag_submit(id: int, db: Session = Depends(get_db)):
  55. item = crud.get_airflow_job_once(db, id)
  56. create_airflow_job_submit(schemas.AirflowJob(**item.to_dict()))
  57. @router_af_job.post("/{job_id}/run")
  58. @web_try()
  59. @sxtimeit
  60. def trigger_af_job_run(job_id: int):
  61. return call_airflow_api(method='post', uri=f'dags/dag_{job_id}/dagRuns', args_dict={"data": json.dumps({})}).json()
  62. @router_af_job.post("/{job_id}/adv_run")
  63. @web_try()
  64. @sxtimeit
  65. def trigger_af_job_adv_run(job_id: int, parameters: Optional[AirflowTrigger]):
  66. return call_airflow_api(method='post', uri=f'dags/dag_{job_id}/dagRuns',
  67. args_dict={"json": {"conf": parameters.parameters if parameters else {}}}).json()
  68. @router_af_job.post("/000/data_transfer_run")
  69. @web_try()
  70. @sxtimeit
  71. def trigger_af_data_transfer_job_run(source_tb: str, target_tb: str):
  72. rets = call_airflow_api(method='post', uri=f'dags/dag_0/dagRuns',
  73. args_dict={"json": {"conf": {"input": source_tb, "output": target_tb}}}).json()
  74. return {"af_run_id": rets.get('dag_run_id', None),
  75. "dag_id": "dag_0"}
  76. @router_af_job.post("/special_job/install_requirements")
  77. @web_try()
  78. @sxtimeit
  79. def trigger_af_requirements_install_job_run(requirements_str: str, target_path: str):
  80. rets = call_airflow_api(method='post', uri=f'dags/dag_-1/dagRuns',
  81. args_dict={"json": {"conf": {"requirements_str": requirements_str, "target_path": target_path}}}).json()
  82. return {"af_run_id": rets.get('dag_run_id', None),
  83. "dag_id": "dag_-1"}
  84. @router_af_job.delete("/{item_id}")
  85. @web_try()
  86. @sxtimeit
  87. def delete_af_job(item_id: int, db: Session = Depends(get_db)):
  88. uri_prefix, headers = get_airflow_api_info()
  89. uri = f'{uri_prefix}/dags/dag_{item_id}'
  90. requests.delete(uri, headers=headers)
  91. os.remove(get_job_path(job_id=item_id))
  92. crud.delete_airflow_job(db, item_id)
  93. @router_af_job.patch("/{item_id}/pause/{pause}")
  94. @web_try()
  95. @sxtimeit
  96. def pause_af_job(pause: bool, item_id: int):
  97. uri_prefix, headers = get_airflow_api_info()
  98. url = f'{uri_prefix}/dags/dag_{item_id}'
  99. return requests.patch(url, headers=headers, data=json.dumps({"is_paused": pause})).json()