af_job.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from typing import List
  2. from app import models, schemas
  3. from sqlalchemy.orm import Session
  4. import time
  5. from app.core.airflow.job import AirflowJobSubmitter
  6. from app.crud import update_to_db
  7. def create_airflow_job(db: Session, item: schemas.AirflowJobCreate):
  8. db_item = models.AirflowJob(**item.dict(), **{"create_time": int(time.time()), "update_time": int(time.time())})
  9. db.add(db_item)
  10. db.commit()
  11. db.refresh(db_item)
  12. return db_item
  13. def get_airflow_jobs(db: Session):
  14. res: List[models.AirflowJob] = db.query(models.AirflowJob).all()
  15. return res
  16. def get_airflow_job_once(db: Session, item_id: int):
  17. res: models.AirflowJob = db.query(models.AirflowJob).filter(models.AirflowJob.id == item_id).first()
  18. return res
  19. def update_airflow_job(db: Session, item_id: int, update_item: schemas.AirflowJobUpdate):
  20. return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowJob)
  21. def delete_airflow_job(db: Session, item_id: int):
  22. item = get_airflow_job_once(item_id=item_id, db=db)
  23. if not item:
  24. raise Exception(f"delete failed, job {item.id} not found")
  25. db.delete(item)
  26. db.commit()
  27. db.flush()
  28. def create_airflow_job_submit(item: schemas.AirflowJob):
  29. AirflowJobSubmitter.submit_dag(item)