af_run.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. from typing import List
  2. from app import models, schemas
  3. from sqlalchemy.orm import Session
  4. from app.crud import update_to_db
  5. def create_airflow_run(db: Session, item: schemas.AirflowRunCreate):
  6. db_item = models.AirflowRun(**item.dict())
  7. print(f'create_airflow_run-before:{db_item.to_dict()}')
  8. db.add(db_item)
  9. db.commit()
  10. db.refresh(db_item)
  11. print(f'create_airflow_run-after:{db_item.to_dict()}')
  12. return db_item
  13. def update_airflow_run(db: Session, item_id: int, update_item: schemas.AirflowRunUpdate):
  14. return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowRun)
  15. def get_airflow_runs(db: Session):
  16. res: List[models.AirflowRun] = db.query(models.AirflowRun).all()
  17. return res
  18. def get_airflow_runs_by_af_job_ids(db: Session, job_ids: List[int], start: int = None, end: int = None):
  19. res: List[models.AirflowRun] = []
  20. if start is None or end is None:
  21. res = db.query(models.AirflowRun) \
  22. .filter(models.AirflowRun.job_id.in_(job_ids)).all()
  23. else:
  24. res = db.query(models.AirflowRun) \
  25. .filter(models.AirflowRun.job_id.in_(job_ids))\
  26. .order_by(models.AirflowRun.start_time.desc())\
  27. .slice(start,end).all()
  28. return res
  29. def get_airflow_run_once_normal_mode(db: Session, job_id, af_run_id: str):
  30. res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.af_run_id == af_run_id,
  31. models.AirflowRun.job_id == job_id).first()
  32. return res
  33. def get_airflow_run_once_debug_mode(db: Session, job_id: int):
  34. res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.job_id == job_id).first()
  35. return res
  36. def get_airflow_run_once(db: Session, item_id: int):
  37. res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.id == item_id).first()
  38. return res
  39. def count_airflow_runs_by_job_ids(db: Session, job_ids: List[int]):
  40. count = db.query(models.AirflowRun) \
  41. .filter(models.AirflowRun.job_id.in_(job_ids)).count()
  42. return count
  43. def get_all_running_airflow_runs(db: Session, start_time: int):
  44. res: List[models.AirflowRun] = db.query(models.AirflowRun)\
  45. .filter(models.AirflowRun.status.in_([0,1]))\
  46. .filter(models.AirflowRun.start_time<=start_time).all()
  47. return res