12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from app.crud import update_to_db
- def create_airflow_run(db: Session, item: schemas.AirflowRunCreate):
- db_item = models.AirflowRun(**item.dict())
- print(f'create_airflow_run-before:{db_item.to_dict()}')
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- print(f'create_airflow_run-after:{db_item.to_dict()}')
- return db_item
- def update_airflow_run(db: Session, item_id: int, update_item: schemas.AirflowRunUpdate):
- return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowRun)
- def get_airflow_runs(db: Session):
- res: List[models.AirflowRun] = db.query(models.AirflowRun).all()
- return res
- def get_airflow_runs_by_af_job_ids(db: Session, job_ids: List[int], start: int = None, end: int = None):
- res: List[models.AirflowRun] = []
- if start is None or end is None:
- res = db.query(models.AirflowRun) \
- .filter(models.AirflowRun.job_id.in_(job_ids)).all()
- else:
- res = db.query(models.AirflowRun) \
- .filter(models.AirflowRun.job_id.in_(job_ids))\
- .order_by(models.AirflowRun.start_time.desc())\
- .slice(start,end).all()
- return res
- def get_airflow_run_once_normal_mode(db: Session, job_id, af_run_id: str):
- res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.af_run_id == af_run_id,
- models.AirflowRun.job_id == job_id).first()
- return res
- def get_airflow_run_once_debug_mode(db: Session, job_id: int):
- res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.job_id == job_id).first()
- return res
- def get_airflow_run_once(db: Session, item_id: int):
- res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.id == item_id).first()
- return res
- def count_airflow_runs_by_job_ids(db: Session, job_ids: List[int]):
- count = db.query(models.AirflowRun) \
- .filter(models.AirflowRun.job_id.in_(job_ids)).count()
- return count
- def get_all_running_airflow_runs(db: Session, start_time: int):
- res: List[models.AirflowRun] = db.query(models.AirflowRun)\
- .filter(models.AirflowRun.status.in_([0,1]))\
- .filter(models.AirflowRun.start_time<=start_time).all()
- return res
|