job_info.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import time
  2. from typing import List
  3. from app import models, schemas
  4. from sqlalchemy.orm import Session
  5. def create_job_info(db: Session, item: schemas.JobInfoCreate):
  6. create_time: int = int(time.time())
  7. db_item = models.JobInfo(**item.dict(), **{
  8. 'trigger_status': 0,
  9. 'create_time': create_time,
  10. 'update_time': create_time,
  11. 'user_id': 1, # TODO 用户
  12. 'trigger_status': 0,
  13. 'delete_status': 1,
  14. })
  15. db.add(db_item)
  16. db.commit()
  17. db.refresh(db_item)
  18. return db_item
  19. def get_job_infos(db: Session, skip: int = 0, limit: int = 20):
  20. res: List[models.JobInfo] = db.query(models.JobInfo).filter(models.JobInfo.delete_status == 1).all() # TODO: 排序
  21. return res
  22. def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate):
  23. db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
  24. if not db_item:
  25. raise Exception('未找到该任务')
  26. update_dict = update_item.dict(exclude_unset=True)
  27. for k, v in update_dict.items():
  28. setattr(db_item, k, v)
  29. db_item.update_time = int(time.time())
  30. db.commit()
  31. db.flush()
  32. db.refresh(db_item)
  33. return db_item
  34. def delete_job_info(db: Session, job_id: int):
  35. job_item = db.query(models.JobInfo).filter(models.JobInfo.id == job_id).first()
  36. if not job_item:
  37. raise Exception('未找到该任务')
  38. job_item.delete_status = 0
  39. db.commit()
  40. db.flush()
  41. db.refresh(job_item)
  42. return job_item