jm_job_info.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import time
  2. from sqlalchemy.orm import Session
  3. from app import models, schemas
  4. from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
  5. from app.utils.cron_utils import joint_cron_expression
  6. import app.crud as crud
  7. from configs.globals import g
  8. def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
  9. create_time = int(time.time())
  10. jm_job_info_create = item.dict()
  11. # 定时对象转为cron表达式
  12. cron_expression_item = jm_job_info_create.pop('cron_expression', None)
  13. if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
  14. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  15. cron_select_type = cron_expression_item["cron_select_type"]
  16. jm_job_info_create.update({
  17. 'cron_select_type': cron_select_type,
  18. 'cron_expression': cron_expression,
  19. })
  20. # 节点与边的剥离
  21. nodes = jm_job_info_create.pop('nodes', None)
  22. edges = jm_job_info_create.pop('edges', None)
  23. db_item = db.query(models.JmJobInfo)\
  24. .filter(models.JmJobInfo.name == jm_job_info_create['name'])\
  25. .filter(models.JmJobInfo.project_id == g.project_id)\
  26. .filter(models.JmJobInfo.delete_status != 0).first()
  27. if db_item:
  28. raise Exception('定时任务名称已存在')
  29. # 标签的存储
  30. tag = jm_job_info_create['tag']
  31. crud.find_and_update(db, '任务标签', tag)
  32. jm_job_info_dict = {}
  33. jm_job_info_dict.update(jm_job_info_create)
  34. jm_job_info_dict.update({
  35. 'status': 0,
  36. 'delete_status': 1,
  37. 'create_time': create_time,
  38. 'update_time': create_time,
  39. 'user_name': g.user_name,
  40. 'user_id': g.user_id,
  41. 'project_id': g.project_id
  42. })
  43. jm_job_info = models.JmJobInfo(**jm_job_info_dict)
  44. # 创建airflow端任务
  45. af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
  46. # 创建local端任务
  47. jm_job_info = crud.create_jm_job_info(db,jm_job_info)
  48. # 创建多作业节点与节点关系
  49. create_jm_job_node(db, nodes, edges, jm_job_info.id)
  50. # 创建关系
  51. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  52. return jm_job_info
  53. def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
  54. jm_job_info_update = item.dict(exclude_unset=True)
  55. # 定时对象转为cron表达式
  56. cron_expression_item = jm_job_info_update.pop('cron_expression', None)
  57. if jm_job_info_update['cron_type'] == 2:
  58. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  59. cron_select_type = cron_expression_item["cron_select_type"]
  60. jm_job_info_update.update({
  61. 'cron_select_type': cron_select_type,
  62. 'cron_expression': cron_expression,
  63. })
  64. # 节点与边的剥离
  65. nodes = jm_job_info_update.pop('nodes', None)
  66. edges = jm_job_info_update.pop('edges', None)
  67. db_item = db.query(models.JmJobInfo)\
  68. .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
  69. if not db_item:
  70. raise Exception('未找到该定时任务')
  71. db_name_item = db.query(models.JmJobInfo)\
  72. .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
  73. .filter(models.JmJobInfo.project_id == g.project_id)\
  74. .filter(models.JmJobInfo.delete_status != 0)\
  75. .filter(models.JmJobInfo.id != item.id).first()
  76. if db_name_item:
  77. raise Exception('定时任务名称已存在')
  78. # 标签的存储
  79. tag = jm_job_info_update['tag']
  80. crud.find_and_update(db, '任务标签', tag)
  81. jm_job_info_update.update({
  82. 'update_time': int(time.time()),
  83. 'user_name': g.user_name,
  84. 'user_id': g.user_id,
  85. 'project_id': g.project_id
  86. })
  87. for k, v in jm_job_info_update.items():
  88. setattr(db_item, k, v)
  89. # 修改airflow端任务
  90. af_job = jm_job_update_job(db_item,nodes,edges,db)
  91. # 修改local端任务
  92. db_item = crud.update_jm_job_info(db,db_item)
  93. # 删除之前的作业节点并创建新作业节点与节点关系
  94. crud.delete_job_node(db, db_item.id)
  95. create_jm_job_node(db, nodes, edges, db_item.id)
  96. return db_item
  97. def create_jm_job_node(db: Session, nodes, edges, job_id):
  98. uuid_node_id = {}
  99. if nodes is None or len(nodes) == 0:
  100. return
  101. for node in nodes:
  102. uuid = node['id']
  103. node_item = models.JmJobNode(**{
  104. 'job_id': job_id,
  105. 'homework_id': node['homework_id'],
  106. 'homework_name': node['homework_name'],
  107. 'start_point': 1,
  108. })
  109. node_item = crud.create_jm_job_node(db,node_item)
  110. node_id = node_item.id
  111. uuid_node_id.update({uuid:node_id})
  112. if nodes is None or len(nodes) == 0:
  113. return
  114. for edge in edges:
  115. edge_item = models.JmJobEdge(**{
  116. 'job_id': job_id,
  117. 'in_node_id': uuid_node_id[edge['source']],
  118. 'out_node_id': uuid_node_id[edge['target']]
  119. })
  120. edge = crud.create_jm_job_edge(db,edge_item)
  121. return
  122. def update_jm_job_status_services(db: Session, job_id: int, status: int):
  123. job_relation = crud.get_af_id(db,job_id,'job')
  124. on_off_control(job_relation.af_id, status)
  125. return crud.update_jm_job_status(db,job_id,status)
  126. def execute_job_services(db: Session, jm_job_id: int):
  127. relation = crud.get_af_id(db, jm_job_id, 'job')
  128. res = execute_job(relation.af_id)
  129. return res