jm_job_info.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import time
  2. from typing import List
  3. from app import models, schemas
  4. from sqlalchemy.orm import Session
  5. from app.crud.constant import find_and_update
  6. from app.utils.cron_utils import *
  7. from app.services.jm_job import jm_job_submit
  8. def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
  9. jm_job_info_create = item.dict()
  10. cron_expression_item = jm_job_info_create.pop('cron_expression', None)
  11. if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
  12. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  13. cron_select_type = cron_expression_item["cron_select_type"]
  14. jm_job_info_create.update({
  15. 'cron_select_type': cron_select_type,
  16. 'cron_expression': cron_expression,
  17. })
  18. nodes = jm_job_info_create.pop('nodes', None)
  19. edges = jm_job_info_create.pop('edges', None)
  20. db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
  21. .filter(models.JmJobInfo.delete_status != 0).first()
  22. if db_item:
  23. raise Exception('定时任务名称已存在')
  24. tag = jm_job_info_create['tag']
  25. find_and_update(db, '任务标签', tag)
  26. jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
  27. 'status': 0,
  28. 'delete_status': 1,
  29. })
  30. db.add(jm_job_info)
  31. db.commit()
  32. db.refresh(jm_job_info)
  33. jm_job_submit(jm_job_info, db)
  34. return jm_job_info,nodes,edges
  35. def get_jm_job_infos(db: Session):
  36. res: List[models.JmJobInfo] = db.query(models.JmJobInfo)\
  37. .filter(models.JmJobInfo.delete_status != 0)\
  38. .order_by(models.JmJobInfo.id.desc()).all()
  39. return res
  40. def get_jm_job_info(db: Session, jm_job_id: int):
  41. item = db.query(models.JmJobInfo)\
  42. .filter(models.JmJobInfo.id == jm_job_id)\
  43. .filter(models.JmJobInfo.delete_status != 0).first()
  44. if not item:
  45. raise Exception('未找到该定时任务')
  46. return item
  47. def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
  48. jm_job_info_update = item.dict(exclude_unset=True)
  49. cron_expression_item = jm_job_info_update.pop('cron_expression', None)
  50. if jm_job_info_update['cron_type'] == 2:
  51. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  52. cron_select_type = cron_expression_item["cron_select_type"]
  53. jm_job_info_update.update({
  54. 'cron_select_type': cron_select_type,
  55. 'cron_expression': cron_expression,
  56. })
  57. nodes = jm_job_info_update.pop('nodes', None)
  58. edges = jm_job_info_update.pop('edges', None)
  59. db_item = db.query(models.JmJobInfo)\
  60. .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
  61. if not db_item:
  62. raise Exception('未找到该定时任务')
  63. db_name_item = db.query(models.JmJobInfo)\
  64. .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
  65. .filter(models.JmJobInfo.delete_status != 0)\
  66. .filter(models.JmJobInfo.id != item.id).first()
  67. if db_name_item:
  68. raise Exception('定时任务名称已存在')
  69. tag = jm_job_info_update['tag']
  70. find_and_update(db, '任务标签', tag)
  71. for k, v in jm_job_info_update.items():
  72. setattr(db_item, k, v)
  73. db.commit()
  74. db.flush()
  75. db.refresh(db_item)
  76. jm_job_submit(db_item,db)
  77. return db_item,nodes,edges
  78. def delete_jm_job_info(db: Session, jm_job_id: int):
  79. jm_job_info = db.query(models.JmJobInfo)\
  80. .filter(models.JmJobInfo.id == jm_job_id).first()
  81. if not jm_job_info:
  82. raise Exception('未找到该定时任务')
  83. if jm_job_info.status == 1:
  84. raise Exception('该任务未停用,不能删除')
  85. jm_job_info.delete_status = 0
  86. db.commit()
  87. db.flush()
  88. db.refresh(jm_job_info)
  89. return jm_job_info
  90. def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
  91. jm_job_info = db.query(models.JmJobInfo)\
  92. .filter(models.JmJobInfo.id == item.id)\
  93. .filter(models.JmJobInfo.delete_status != 0).first()
  94. if not jm_job_info:
  95. raise Exception('未找到该定时任务')
  96. jm_job_info.status = item.status
  97. jm_job_submit(jm_job_info,db)
  98. db.commit()
  99. db.flush()
  100. db.refresh(jm_job_info)
  101. return jm_job_info