from typing import List from app import models, schemas from sqlalchemy.orm import Session import time from app.core.airflow.job import AirflowJobSubmitter from app.crud import update_to_db def create_airflow_job(db: Session, item: schemas.AirflowJobCreate): db_item = models.AirflowJob(**item.dict(), **{"create_time": int(time.time()), "update_time": int(time.time())}) db.add(db_item) db.commit() db.refresh(db_item) return db_item def get_airflow_jobs(db: Session): res: List[models.AirflowJob] = db.query(models.AirflowJob).all() return res def get_airflow_job_once(db: Session, item_id: int): res: models.AirflowJob = db.query(models.AirflowJob).filter(models.AirflowJob.id == item_id).first() return res def update_airflow_job(db: Session, item_id: int, update_item: schemas.AirflowJobUpdate): return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowJob) def delete_airflow_job(db: Session, item_id: int): item = get_airflow_job_once(item_id=item_id, db=db) if not item: raise Exception(f"delete failed, job {item.id} not found") db.delete(item) db.commit() db.flush() def create_airflow_job_submit(item: schemas.AirflowJob): AirflowJobSubmitter.submit_dag(item)