af_run.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. from typing import List
  2. from app import models, schemas
  3. from sqlalchemy.orm import Session
  4. from app.crud import update_to_db
  5. def create_airflow_run(db: Session, item: schemas.AirflowRunCreate):
  6. db_item = models.AirflowRun(**item.dict())
  7. print(f'create_airflow_run-before:{db_item.to_dict()}')
  8. db.add(db_item)
  9. db.commit()
  10. db.refresh(db_item)
  11. print(f'create_airflow_run-after:{db_item.to_dict()}')
  12. return db_item
  13. def update_airflow_run(db: Session, item_id: int, update_item: schemas.AirflowRunUpdate):
  14. return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowRun)
  15. def get_airflow_runs(db: Session):
  16. res: List[models.AirflowRun] = db.query(models.AirflowRun).all()
  17. return res
  18. def get_airflow_runs_by_af_job_ids(db: Session, job_ids: List[int]):
  19. res: List[models.AirflowRun] = db.query(models.AirflowRun)\
  20. .filter(models.AirflowRun.job_id.in_(job_ids)).all()
  21. return res
  22. def get_airflow_run_once_normal_mode(db: Session, af_run_id: str):
  23. res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.af_run_id == af_run_id).first()
  24. return res
  25. def get_airflow_run_once_debug_mode(db: Session, job_id: int):
  26. res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.job_id == job_id).first()
  27. return res
  28. def get_airflow_run_once(db: Session, item_id: int):
  29. res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.id == item_id).first()
  30. return res