jm_job_info.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import time
  2. from typing import List
  3. from app import models, schemas
  4. from sqlalchemy.orm import Session
  5. from app.crud.constant import find_and_update
  6. def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
  7. jm_job_info_create = item.dict()
  8. nodes = jm_job_info_create.pop('nodes', None)
  9. edges = jm_job_info_create.pop('edges', None)
  10. db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
  11. .filter(models.JmJobInfo.delete_status != 0).first()
  12. if db_item:
  13. raise Exception('定时任务名称已存在')
  14. tag = jm_job_info_create['tag']
  15. find_and_update(db, '任务分类', tag)
  16. jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
  17. 'status': 1,
  18. 'delete_status': 1,
  19. })
  20. db.add(jm_job_info)
  21. db.commit()
  22. db.refresh(jm_job_info)
  23. return jm_job_info,nodes,edges
  24. def get_jm_job_infos(db: Session):
  25. res: List[models.JmJobInfo] = db.query(models.JmJobInfo)\
  26. .filter(models.JmJobInfo.delete_status != 0)\
  27. .order_by(models.JmJobInfo.id.desc()).all()
  28. return res
  29. def get_jm_job_info(db: Session, jm_job_id: int):
  30. item = db.query(models.JmJobInfo)\
  31. .filter(models.JmJobInfo.id == jm_job_id)\
  32. .filter(models.JmJobInfo.delete_status != 0).first()
  33. if not item:
  34. raise Exception('未找到该定时任务')
  35. return item
  36. def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
  37. jm_job_info_update = item.dict(exclude_unset=True)
  38. nodes = jm_job_info_update.pop('nodes', None)
  39. edges = jm_job_info_update.pop('edges', None)
  40. db_item = db.query(models.JmJobInfo)\
  41. .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
  42. if not db_item:
  43. raise Exception('未找到该定时任务')
  44. for k, v in jm_job_info_update.items():
  45. setattr(db_item, k, v)
  46. db.commit()
  47. db.flush()
  48. db.refresh(db_item)
  49. return db_item,nodes,edges
  50. def delete_jm_job_info(db: Session, jm_job_id: int):
  51. jm_job_info = db.query(models.JmJobInfo)\
  52. .filter(models.JmJobInfo.id == jm_job_id).first()
  53. if not jm_job_info:
  54. raise Exception('未找到该定时任务')
  55. jm_job_info.delete_status = 0
  56. db.commit()
  57. db.flush()
  58. db.refresh(jm_job_info)
  59. return jm_job_info
  60. def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
  61. jm_job_info = db.query(models.JmJobInfo)\
  62. .filter(models.JmJobInfo.id == item.id)\
  63. .filter(models.JmJobInfo.delete_status != 0).first()
  64. if not jm_job_info:
  65. raise Exception('未找到该定时任务')
  66. jm_job_info.status = item.status
  67. db.commit()
  68. db.flush()
  69. db.refresh(jm_job_info)
  70. return jm_job_info