job_info.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import time
  2. from typing import List
  3. from app import models, schemas
  4. from sqlalchemy.orm import Session
  5. from app.services.datax import datax_create_job
  6. from app.utils.cron_utils import *
  7. def create_job_info(db: Session, item: models.JobInfo):
  8. db.add(item)
  9. db.commit()
  10. db.refresh(item)
  11. return item
  12. def get_job_infos(db: Session):
  13. res: List[models.JobInfo] = db.query(models.JobInfo).filter(models.JobInfo.delete_status == 1).all() # TODO: 排序
  14. return res
  15. def update_job_info(db: Session, id: int, update_item: models.JobInfo):
  16. db.commit()
  17. db.flush()
  18. db.refresh(update_item)
  19. return update_item
  20. def update_job_trigger_status(db: Session, id: int, trigger_status: int):
  21. db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
  22. if not db_item:
  23. raise Exception('未找到该任务')
  24. db_item.trigger_status = trigger_status
  25. db_item.update_time = int(time.time())
  26. db.commit()
  27. db.flush()
  28. db.refresh(db_item)
  29. return db_item
  30. def get_job_info(db: Session, id: int):
  31. db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
  32. if not db_item:
  33. raise Exception('未找到该任务')
  34. return db_item
  35. def delete_job_info(db: Session, job_id: int):
  36. job_item = db.query(models.JobInfo).filter(models.JobInfo.id == job_id).first()
  37. if not job_item:
  38. raise Exception('未找到该任务')
  39. if job_item.trigger_status == 1:
  40. raise Exception('该任务未停用,不能删除')
  41. job_item.delete_status = 0
  42. db.commit()
  43. db.flush()
  44. db.refresh(job_item)
  45. return job_item