123456789101112131415161718192021222324252627282930313233343536373839404142 |
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- import time
- from app.core.airflow.job import AirflowJobSubmitter
- from app.crud import update_to_db
- def create_airflow_job(db: Session, item: schemas.AirflowJobCreate):
- db_item = models.AirflowJob(**item.dict(), **{"create_time": int(time.time()), "update_time": int(time.time())})
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
- def get_airflow_jobs(db: Session):
- res: List[models.AirflowJob] = db.query(models.AirflowJob).all()
- return res
- def get_airflow_job_once(db: Session, item_id: int):
- res: models.AirflowJob = db.query(models.AirflowJob).filter(models.AirflowJob.id == item_id).first()
- return res
- def update_airflow_job(db: Session, item_id: int, update_item: schemas.AirflowJobUpdate):
- return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowJob)
- def delete_airflow_job(db: Session, item_id: int):
- item = get_airflow_job_once(item_id=item_id, db=db)
- if not item:
- raise Exception(f"delete failed, job {item.id} not found")
- db.delete(item)
- db.commit()
- db.flush()
- def create_airflow_job_submit(item: schemas.AirflowJob):
- AirflowJobSubmitter.submit_dag(item)
|