job_info.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import time
  2. from typing import List
  3. from app import models, schemas
  4. from sqlalchemy.orm import Session
  5. from app.utils.cron_utils import *
  6. def create_job_info(db: Session, item: models.JobInfo):
  7. db.add(item)
  8. db.commit()
  9. db.refresh(item)
  10. return item
  11. def get_job_infos(db: Session):
  12. res: List[models.JobInfo] = db.query(models.JobInfo)\
  13. .filter(models.JobInfo.delete_status == 1)\
  14. .order_by(models.JobInfo.create_time.desc()).all() # TODO: 排序
  15. return res
  16. def update_job_info(db: Session, id: int, update_item: models.JobInfo):
  17. db.commit()
  18. db.flush()
  19. db.refresh(update_item)
  20. return update_item
  21. def update_job_trigger_status(db: Session, id: int, trigger_status: int):
  22. db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
  23. if not db_item:
  24. raise Exception('未找到该任务')
  25. db_item.trigger_status = trigger_status
  26. db_item.update_time = int(time.time())
  27. db.commit()
  28. db.flush()
  29. db.refresh(db_item)
  30. return db_item
  31. def get_job_info(db: Session, id: int):
  32. db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
  33. if not db_item:
  34. raise Exception('未找到该任务')
  35. return db_item
  36. def delete_job_info(db: Session, job_id: int):
  37. job_item = db.query(models.JobInfo).filter(models.JobInfo.id == job_id).first()
  38. if not job_item:
  39. raise Exception('未找到该任务')
  40. if job_item.trigger_status == 1:
  41. raise Exception('该任务未停用,不能删除')
  42. job_item.delete_status = 0
  43. db.commit()
  44. db.flush()
  45. db.refresh(job_item)
  46. return job_item