af_task.py 903 B

123456789101112131415161718192021222324252627
  1. from typing import List
  2. from app import models, schemas
  3. from sqlalchemy.orm import Session
  4. from app.crud.basic import update_to_db
  5. import time
  6. def create_airflow_task(db: Session, item: schemas.AirflowTaskCreate):
  7. db_item = models.AirflowTask(**item.dict(), **{"create_time": int(time.time()), "update_time": int(time.time())})
  8. db.add(db_item)
  9. db.commit()
  10. db.refresh(db_item)
  11. return db_item
  12. def update_airflow_task(db: Session, item_id: int, update_item: schemas.AirflowTaskUpdate):
  13. return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowTask)
  14. def get_airflow_tasks(db: Session):
  15. res: List[models.AirflowTask] = db.query(models.AirflowTask).all()
  16. return res
  17. def get_airflow_task_once(db: Session, id: int):
  18. res: models.AirflowTask = db.query(models.AirflowTask).filter(models.AirflowTask.id == id).first()
  19. return res