job.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import datetime
  2. import json
  3. import os
  4. import requests
  5. from fastapi import APIRouter, Depends
  6. from fastapi_pagination import paginate, Params
  7. from sqlalchemy.orm import Session
  8. from app import schemas, get_db, crud
  9. from app.core.airflow.uri import get_job_path, get_airflow_api_info
  10. from app.crud import create_airflow_job_submit
  11. from utils import web_try, sxtimeit
  12. router_af_job = APIRouter(
  13. prefix="/jpt/af_job",
  14. tags=["airflow_job-任务管理"],
  15. )
  16. @router_af_job.get("/")
  17. @web_try()
  18. @sxtimeit
  19. def get_af_jobs(params: Params = Depends(), db: Session = Depends(get_db)):
  20. return paginate(crud.get_airflow_jobs(db), params)
  21. @router_af_job.get("/{item_id}/last_parsed_time")
  22. @web_try()
  23. @sxtimeit
  24. def get_af_jobs(item_id: int):
  25. uri_prefix, headers = get_airflow_api_info()
  26. url = f'{uri_prefix}/dags/dag_{item_id}'
  27. rets = requests.get(url, headers=headers).json()
  28. if 'last_parsed_time' in rets:
  29. last_parsed_time_str = rets['last_parsed_time']
  30. last_parsed_time = datetime.datetime.strptime(last_parsed_time_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
  31. else:
  32. last_parsed_time= None
  33. return {"last_parsed_time": last_parsed_time}
  34. @router_af_job.get("/getOnce/{item_id}")
  35. @web_try()
  36. @sxtimeit
  37. def get_af_jobs_once(item_id: int, db: Session = Depends(get_db)):
  38. return crud.get_airflow_job_once(db, item_id)
  39. @router_af_job.post("/")
  40. @web_try()
  41. @sxtimeit
  42. def add_af_job(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
  43. return crud.create_airflow_job(db, item)
  44. @router_af_job.put("/{item_id}")
  45. @web_try()
  46. @sxtimeit
  47. def update_af_job(item_id: int, update_item: schemas.AirflowJobUpdate, db: Session = Depends(get_db)):
  48. return crud.update_airflow_job(db=db, item_id=item_id, update_item=update_item)
  49. @router_af_job.post("/submit")
  50. @web_try()
  51. @sxtimeit
  52. def add_dag_submit(id: int, db: Session = Depends(get_db)):
  53. item = crud.get_airflow_job_once(db, id)
  54. create_airflow_job_submit(schemas.AirflowJob(**item.to_dict()))
  55. @router_af_job.post("/{item_id}/run")
  56. @web_try()
  57. @sxtimeit
  58. def trigger_af_job_run(item_id: int, db: Session = Depends(get_db)):
  59. job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
  60. uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
  61. headers = {
  62. 'content-type': 'application/json',
  63. 'Authorization': 'basic YWRtaW46YWRtaW4=',
  64. 'Host': 'airflow-web.sxkj.com'
  65. }
  66. response = requests.post(uri, headers=headers, data=json.dumps({}))
  67. return response.json()
  68. #
  69. # @router_af_job.post("/{item_id}/update_and_run")
  70. # @web_try()
  71. # @sxtimeit
  72. # def update_and_trigger_job_run(item_id: int, db: Session = Depends(get_db)):
  73. # job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
  74. # uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
  75. # headers = {
  76. # 'content-type': 'application/json',
  77. # 'Authorization': 'basic YWRtaW46YWRtaW4=',
  78. # 'Host': 'airflow-web.sxkj.com'
  79. # }
  80. #
  81. # response = requests.post(uri, headers=headers, data=json.dumps({}))
  82. # return response.json()
  83. @router_af_job.delete("/{item_id}")
  84. @web_try()
  85. @sxtimeit
  86. def delete_af_job(item_id: int, db: Session = Depends(get_db)):
  87. uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}'
  88. headers = {
  89. 'content-type': 'application/json',
  90. 'Authorization': 'basic YWRtaW46YWRtaW4=',
  91. 'Host': 'airflow-web.sxkj.com'
  92. }
  93. requests.delete(uri, headers=headers)
  94. os.remove(get_job_path(job_id=item_id))
  95. crud.delete_airflow_job(db, item_id)
  96. @router_af_job.patch("/{item_id}/pause/{pause}")
  97. @web_try()
  98. @sxtimeit
  99. def pause_af_job(pause: bool, item_id: int):
  100. uri_prefix, headers = get_airflow_api_info()
  101. url = f'{uri_prefix}/dags/dag_{item_id}'
  102. return requests.patch(url, headers=headers, data=json.dumps({"is_paused": pause})).json()