from typing import List from app import models, schemas from sqlalchemy.orm import Session from app.crud import update_to_db def create_airflow_run(db: Session, item: schemas.AirflowRunCreate): db_item = models.AirflowRun(**item.dict()) print(f'create_airflow_run-before:{db_item.to_dict()}') db.add(db_item) db.commit() db.refresh(db_item) print(f'create_airflow_run-after:{db_item.to_dict()}') return db_item def update_airflow_run(db: Session, item_id: int, update_item: schemas.AirflowRunUpdate): return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowRun) def get_airflow_runs(db: Session): res: List[models.AirflowRun] = db.query(models.AirflowRun).all() return res def get_airflow_runs_by_af_job_ids(db: Session, job_ids: List[int], start: int = None, end: int = None): res: List[models.AirflowRun] = [] if start is None or end is None: res = db.query(models.AirflowRun) \ .filter(models.AirflowRun.job_id.in_(job_ids)).all() else: res = db.query(models.AirflowRun) \ .filter(models.AirflowRun.job_id.in_(job_ids))\ .order_by(models.AirflowRun.start_time.desc())\ .slice(start,end).all() return res def get_airflow_run_once_normal_mode(db: Session, job_id, af_run_id: str): res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.af_run_id == af_run_id, models.AirflowRun.job_id == job_id).first() return res def get_airflow_run_once_debug_mode(db: Session, job_id: int): res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.job_id == job_id).first() return res def get_airflow_run_once(db: Session, item_id: int): res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.id == item_id).first() return res def count_airflow_runs_by_job_ids(db: Session, job_ids: List[int]): count = db.query(models.AirflowRun) \ .filter(models.AirflowRun.job_id.in_(job_ids)).count() return count def get_all_running_airflow_runs(db: Session, start_time: int): res: List[models.AirflowRun] = db.query(models.AirflowRun)\ .filter(models.AirflowRun.status.in_([0,1]))\ .filter(models.AirflowRun.start_time<=start_time).all() return res