job_info.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import time
  2. from typing import List
  3. from app import models, schemas
  4. from sqlalchemy.orm import Session
  5. from app.utils.cron_utils import *
  6. def create_job_info(db: Session, item: schemas.JobInfoCreate):
  7. create_time: int = int(time.time())
  8. item_dict = item.dict()
  9. cron_expression_dict = item_dict.pop('cron_expression')
  10. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
  11. cron_select_type = cron_expression_dict["cron_select_type"]
  12. item_dict.update({
  13. 'cron_select_type': cron_select_type,
  14. 'job_cron': cron_expression,
  15. })
  16. partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() else None
  17. partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() else None
  18. partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() else None
  19. partition_info_str = ''
  20. if partition_info is not None and partition_time is not None and partition_num is not None:
  21. partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
  22. elif (partition_info is not None or partition_info != '') and (partition_time is None or partition_num is None):
  23. raise Exception('分区信息不完善')
  24. item_dict.update({
  25. 'partition_info': partition_info_str,
  26. })
  27. db_item = models.JobInfo(**item_dict, **{
  28. 'trigger_status': 0,
  29. 'create_time': create_time,
  30. 'update_time': create_time,
  31. 'delete_status': 1,
  32. })
  33. db.add(db_item)
  34. db.commit()
  35. db.refresh(db_item)
  36. return db_item
  37. def get_job_infos(db: Session, skip: int = 0, limit: int = 20):
  38. res: List[models.JobInfo] = db.query(models.JobInfo).filter(models.JobInfo.delete_status == 1).all() # TODO: 排序
  39. return res
  40. def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate):
  41. db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
  42. if not db_item:
  43. raise Exception('未找到该任务')
  44. update_dict = update_item.dict(exclude_unset=True)
  45. cron_expression_dict = update_dict.pop('cron_expression')
  46. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
  47. cron_select_type = cron_expression_dict["cron_select_type"]
  48. update_dict.update({
  49. 'cron_select_type': cron_select_type,
  50. 'job_cron': cron_expression,
  51. })
  52. partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() else None
  53. partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys() else None
  54. partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys() else None
  55. partition_info_str = ''
  56. if partition_info is not None and partition_time is not None and partition_num is not None:
  57. partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
  58. elif (partition_info is not None or partition_info != '') and (partition_time is None or partition_num is None):
  59. raise Exception('分区信息不完善')
  60. update_dict.update({
  61. 'partition_info': partition_info_str,
  62. })
  63. for k, v in update_dict.items():
  64. setattr(db_item, k, v)
  65. db_item.update_time = int(time.time())
  66. db.commit()
  67. db.flush()
  68. db.refresh(db_item)
  69. return db_item
  70. def update_job_trigger_status(db: Session, id: int, trigger_status: int):
  71. db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
  72. if not db_item:
  73. raise Exception('未找到该任务')
  74. db_item.trigger_status = trigger_status
  75. db_item.update_time = int(time.time())
  76. db.commit()
  77. db.flush()
  78. db.refresh(db_item)
  79. return db_item
  80. def get_job_info(db: Session, id: int):
  81. db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
  82. if not db_item:
  83. raise Exception('未找到该任务')
  84. return db_item
  85. def delete_job_info(db: Session, job_id: int):
  86. job_item = db.query(models.JobInfo).filter(models.JobInfo.id == job_id).first()
  87. if not job_item:
  88. raise Exception('未找到该任务')
  89. if job_item.trigger_status == 1:
  90. raise Exception('该任务未停用,不能删除')
  91. job_item.delete_status = 0
  92. db.commit()
  93. db.flush()
  94. db.refresh(job_item)
  95. return job_item