jm_job_info.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from sqlalchemy.orm import Session
  2. from app import models, schemas
  3. from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
  4. from app.utils.cron_utils import joint_cron_expression
  5. import app.crud as crud
  6. def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
  7. jm_job_info_create = item.dict()
  8. # 定时对象转为cron表达式
  9. cron_expression_item = jm_job_info_create.pop('cron_expression', None)
  10. if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
  11. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  12. cron_select_type = cron_expression_item["cron_select_type"]
  13. jm_job_info_create.update({
  14. 'cron_select_type': cron_select_type,
  15. 'cron_expression': cron_expression,
  16. })
  17. # 节点与边的剥离
  18. nodes = jm_job_info_create.pop('nodes', None)
  19. edges = jm_job_info_create.pop('edges', None)
  20. db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
  21. .filter(models.JmJobInfo.delete_status != 0).first()
  22. if db_item:
  23. raise Exception('定时任务名称已存在')
  24. # 标签的存储
  25. tag = jm_job_info_create['tag']
  26. crud.find_and_update(db, '任务标签', tag)
  27. jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
  28. 'status': 0,
  29. 'delete_status': 1,
  30. })
  31. # 创建airflow端任务
  32. af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
  33. # 创建local端任务
  34. jm_job_info = crud.create_jm_job_info(db,jm_job_info)
  35. # 创建多作业节点与节点关系
  36. create_jm_job_node(db, nodes, edges, jm_job_info.id)
  37. # 创建关系
  38. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  39. return jm_job_info
  40. def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
  41. jm_job_info_update = item.dict(exclude_unset=True)
  42. # 定时对象转为cron表达式
  43. cron_expression_item = jm_job_info_update.pop('cron_expression', None)
  44. if jm_job_info_update['cron_type'] == 2:
  45. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  46. cron_select_type = cron_expression_item["cron_select_type"]
  47. jm_job_info_update.update({
  48. 'cron_select_type': cron_select_type,
  49. 'cron_expression': cron_expression,
  50. })
  51. # 节点与边的剥离
  52. nodes = jm_job_info_update.pop('nodes', None)
  53. edges = jm_job_info_update.pop('edges', None)
  54. db_item = db.query(models.JmJobInfo)\
  55. .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
  56. if not db_item:
  57. raise Exception('未找到该定时任务')
  58. db_name_item = db.query(models.JmJobInfo)\
  59. .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
  60. .filter(models.JmJobInfo.delete_status != 0)\
  61. .filter(models.JmJobInfo.id != item.id).first()
  62. if db_name_item:
  63. raise Exception('定时任务名称已存在')
  64. # 标签的存储
  65. tag = jm_job_info_update['tag']
  66. crud.find_and_update(db, '任务标签', tag)
  67. for k, v in jm_job_info_update.items():
  68. setattr(db_item, k, v)
  69. # 修改airflow端任务
  70. af_job = jm_job_update_job(db_item,nodes,edges,db)
  71. # 修改local端任务
  72. db_item = crud.update_jm_job_info(db,db_item)
  73. # 删除之前的作业节点并创建新作业节点与节点关系
  74. crud.delete_job_node(db, db_item.id)
  75. create_jm_job_node(db, nodes, edges, db_item.id)
  76. return db_item
  77. def create_jm_job_node(db: Session, nodes, edges, job_id):
  78. uuid_node_id = {}
  79. if nodes is None or len(nodes) == 0:
  80. return
  81. for node in nodes:
  82. uuid = node['id']
  83. node_item = models.JmJobNode(**{
  84. 'job_id': job_id,
  85. 'homework_id': node['homework_id'],
  86. 'homework_name': node['homework_name'],
  87. 'start_point': 1,
  88. })
  89. node_item = crud.create_jm_job_node(db,node_item)
  90. node_id = node_item.id
  91. uuid_node_id.update({uuid:node_id})
  92. if nodes is None or len(nodes) == 0:
  93. return
  94. for edge in edges:
  95. edge_item = models.JmJobEdge(**{
  96. 'job_id': job_id,
  97. 'in_node_id': uuid_node_id[edge['source']],
  98. 'out_node_id': uuid_node_id[edge['target']]
  99. })
  100. edge = crud.create_jm_job_edge(db,edge_item)
  101. return
  102. def update_jm_job_status_services(db: Session, job_id: int, status: int):
  103. job_relation = crud.get_af_id(db,job_id,'job')
  104. on_off_control(job_relation.af_id, status)
  105. return crud.update_jm_job_status(db,job_id,status)
  106. def execute_job_services(db: Session, jm_job_id: int):
  107. relation = crud.get_af_id(db, jm_job_id, 'job')
  108. res = execute_job(relation.af_id)
  109. return res