jm_job_info.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import time
  2. from typing import List
  3. from app import models, schemas
  4. from sqlalchemy.orm import Session
  5. from app.crud.constant import find_and_update
  6. from app.utils.cron_utils import *
  7. def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
  8. jm_job_info_create = item.dict()
  9. cron_expression_item = jm_job_info_create.pop('cron_expression', None)
  10. if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
  11. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  12. cron_select_type = cron_expression_item["cron_select_type"]
  13. jm_job_info_create.update({
  14. 'cron_select_type': cron_select_type,
  15. 'cron_expression': cron_expression,
  16. })
  17. nodes = jm_job_info_create.pop('nodes', None)
  18. edges = jm_job_info_create.pop('edges', None)
  19. db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
  20. .filter(models.JmJobInfo.delete_status != 0).first()
  21. if db_item:
  22. raise Exception('定时任务名称已存在')
  23. tag = jm_job_info_create['tag']
  24. find_and_update(db, '任务标签', tag)
  25. jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
  26. 'status': 0,
  27. 'delete_status': 1,
  28. })
  29. db.add(jm_job_info)
  30. db.commit()
  31. db.refresh(jm_job_info)
  32. return jm_job_info,nodes,edges
  33. def get_jm_job_infos(db: Session):
  34. res: List[models.JmJobInfo] = db.query(models.JmJobInfo)\
  35. .filter(models.JmJobInfo.delete_status != 0)\
  36. .order_by(models.JmJobInfo.id.desc()).all()
  37. return res
  38. def get_jm_job_info(db: Session, jm_job_id: int):
  39. item = db.query(models.JmJobInfo)\
  40. .filter(models.JmJobInfo.id == jm_job_id)\
  41. .filter(models.JmJobInfo.delete_status != 0).first()
  42. if not item:
  43. raise Exception('未找到该定时任务')
  44. return item
  45. def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
  46. jm_job_info_update = item.dict(exclude_unset=True)
  47. cron_expression_item = jm_job_info_update.pop('cron_expression', None)
  48. if jm_job_info_update['cron_type'] == 2:
  49. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  50. cron_select_type = cron_expression_item["cron_select_type"]
  51. jm_job_info_update.update({
  52. 'cron_select_type': cron_select_type,
  53. 'cron_expression': cron_expression,
  54. })
  55. nodes = jm_job_info_update.pop('nodes', None)
  56. edges = jm_job_info_update.pop('edges', None)
  57. db_item = db.query(models.JmJobInfo)\
  58. .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
  59. if not db_item:
  60. raise Exception('未找到该定时任务')
  61. db_name_item = db.query(models.JmJobInfo)\
  62. .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
  63. .filter(models.JmJobInfo.delete_status != 0)\
  64. .filter(models.JmJobInfo.id != item.id).first()
  65. if db_name_item:
  66. raise Exception('定时任务名称已存在')
  67. tag = jm_job_info_update['tag']
  68. find_and_update(db, '任务标签', tag)
  69. for k, v in jm_job_info_update.items():
  70. setattr(db_item, k, v)
  71. db.commit()
  72. db.flush()
  73. db.refresh(db_item)
  74. return db_item,nodes,edges
  75. def delete_jm_job_info(db: Session, jm_job_id: int):
  76. jm_job_info = db.query(models.JmJobInfo)\
  77. .filter(models.JmJobInfo.id == jm_job_id).first()
  78. if not jm_job_info:
  79. raise Exception('未找到该定时任务')
  80. jm_job_info.delete_status = 0
  81. db.commit()
  82. db.flush()
  83. db.refresh(jm_job_info)
  84. return jm_job_info
  85. def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
  86. jm_job_info = db.query(models.JmJobInfo)\
  87. .filter(models.JmJobInfo.id == item.id)\
  88. .filter(models.JmJobInfo.delete_status != 0).first()
  89. if not jm_job_info:
  90. raise Exception('未找到该定时任务')
  91. jm_job_info.status = item.status
  92. db.commit()
  93. db.flush()
  94. db.refresh(jm_job_info)
  95. return jm_job_info