task_run.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import json
  2. from fastapi import APIRouter, Depends
  3. from fastapi_pagination import paginate, Params
  4. from pydantic import BaseModel
  5. from sqlalchemy.orm import Session
  6. from app import schemas, get_db, crud
  7. from app.core.k8s.k8s_client import KubernetesTools
  8. from utils import web_try, sxtimeit
  9. class Item(BaseModel):
  10. data: dict
  11. router_af_run = APIRouter(
  12. prefix="/jpt/jpt_run",
  13. tags=["airflow_run-运行管理"],
  14. )
  15. @router_af_run.get("/")
  16. @web_try()
  17. @sxtimeit
  18. def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
  19. return paginate(crud.get_airflow_tasks(db), params)
  20. @router_af_run.post("/")
  21. @web_try()
  22. @sxtimeit
  23. def add_airflow_run(item: Item, db: Session = Depends(get_db)):
  24. print(item.data)
  25. job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
  26. sparks_dependence = {}
  27. if job_item is not None:
  28. for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
  29. if task.task_type == 'sparks':
  30. sparks_info = json.loads(task.script)
  31. dependence = []
  32. for (source, sink) in sparks_info['edges']:
  33. dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
  34. sparks_dependence[task.id] = dependence
  35. item = schemas.AirflowRunCreate(**{"start_time": int(item.data["start_time"]),
  36. "job_id": item.data["job_id"],
  37. "run_id": item.data['run_ts'],
  38. "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
  39. "sparks": sparks_dependence}},
  40. "status": "1"},
  41. )
  42. crud.create_airflow_run(db, item)
  43. @router_af_run.post("/notification")
  44. @web_try()
  45. @sxtimeit
  46. def add_notification(item: Item, db: Session = Depends(get_db)):
  47. print(f'input : {item.data} ')
  48. k8s_tool = KubernetesTools()
  49. labels = {"dag_id": item.data['dag_id'],
  50. "task_id": item.data['task_id'],
  51. "run_ts": item.data['run_ts']}
  52. logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
  53. # run = crud.get_airflow_run_once_normal_mode(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db)
  54. run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
  55. if run is not None:
  56. update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
  57. update_run.details['tasks'][item.data['task_id']] = {"log": logs,
  58. "start_time": item.data["start_time"],
  59. "end_time": item.data["end_time"],
  60. "status": item.data['status']}
  61. crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
  62. return
  63. @router_af_run.post("/sigal")
  64. @web_try()
  65. @sxtimeit
  66. def add_notification(item: Item, db: Session = Depends(get_db)):
  67. print(f'receive sigal: {item.data} ')