jm_job_info.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import time
  2. from sqlalchemy.orm import Session
  3. from app import models, schemas
  4. from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
  5. from app.utils.cron_utils import joint_cron_expression
  6. import app.crud as crud
  7. from configs.globals import g
  8. def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
  9. create_time = int(time.time())
  10. jm_job_info_create = item.dict()
  11. # 定时对象转为cron表达式
  12. cron_expression_item = jm_job_info_create.pop('cron_expression', None)
  13. if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
  14. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  15. cron_select_type = cron_expression_item["cron_select_type"]
  16. jm_job_info_create.update({
  17. 'cron_select_type': cron_select_type,
  18. 'cron_expression': cron_expression,
  19. })
  20. # 节点与边的剥离
  21. nodes = jm_job_info_create.pop('nodes', None)
  22. edges = jm_job_info_create.pop('edges', None)
  23. db_item = db.query(models.JmJobInfo)\
  24. .filter(models.JmJobInfo.name == jm_job_info_create['name'])\
  25. .filter(models.JmJobInfo.project_id == g.project_id)\
  26. .filter(models.JmJobInfo.delete_status != 0).first()
  27. if db_item:
  28. raise Exception('定时任务名称已存在')
  29. # 标签的存储
  30. tag = jm_job_info_create['tag']
  31. crud.find_and_update(db, '任务标签', tag)
  32. jm_job_info_dict = {}
  33. jm_job_info_dict.update(jm_job_info_create)
  34. jm_job_info_dict.update({
  35. 'status': 0,
  36. 'delete_status': 1,
  37. 'create_time': create_time,
  38. 'update_time': create_time,
  39. 'user_id': g.user_id,
  40. 'project_id': g.project_id
  41. })
  42. jm_job_info = models.JmJobInfo(**jm_job_info_dict)
  43. # 创建airflow端任务
  44. af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
  45. # 创建local端任务
  46. jm_job_info = crud.create_jm_job_info(db,jm_job_info)
  47. # 创建多作业节点与节点关系
  48. create_jm_job_node(db, nodes, edges, jm_job_info.id)
  49. # 创建关系
  50. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  51. return jm_job_info
  52. def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
  53. jm_job_info_update = item.dict(exclude_unset=True)
  54. # 定时对象转为cron表达式
  55. cron_expression_item = jm_job_info_update.pop('cron_expression', None)
  56. if jm_job_info_update['cron_type'] == 2:
  57. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  58. cron_select_type = cron_expression_item["cron_select_type"]
  59. jm_job_info_update.update({
  60. 'cron_select_type': cron_select_type,
  61. 'cron_expression': cron_expression,
  62. })
  63. # 节点与边的剥离
  64. nodes = jm_job_info_update.pop('nodes', None)
  65. edges = jm_job_info_update.pop('edges', None)
  66. db_item = db.query(models.JmJobInfo)\
  67. .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
  68. if not db_item:
  69. raise Exception('未找到该定时任务')
  70. db_name_item = db.query(models.JmJobInfo)\
  71. .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
  72. .filter(models.JmJobInfo.project_id == g.project_id)\
  73. .filter(models.JmJobInfo.delete_status != 0)\
  74. .filter(models.JmJobInfo.id != item.id).first()
  75. if db_name_item:
  76. raise Exception('定时任务名称已存在')
  77. # 标签的存储
  78. tag = jm_job_info_update['tag']
  79. crud.find_and_update(db, '任务标签', tag)
  80. jm_job_info_update.update({
  81. 'update_time': int(time.time()),
  82. 'user_id': g.user_id,
  83. 'project_id': g.project_id
  84. })
  85. for k, v in jm_job_info_update.items():
  86. setattr(db_item, k, v)
  87. # 修改airflow端任务
  88. af_job = jm_job_update_job(db_item,nodes,edges,db)
  89. # 修改local端任务
  90. db_item = crud.update_jm_job_info(db,db_item)
  91. # 删除之前的作业节点并创建新作业节点与节点关系
  92. crud.delete_job_node(db, db_item.id)
  93. create_jm_job_node(db, nodes, edges, db_item.id)
  94. return db_item
  95. def create_jm_job_node(db: Session, nodes, edges, job_id):
  96. uuid_node_id = {}
  97. if nodes is None or len(nodes) == 0:
  98. return
  99. for node in nodes:
  100. uuid = node['id']
  101. node_item = models.JmJobNode(**{
  102. 'job_id': job_id,
  103. 'homework_id': node['homework_id'],
  104. 'homework_name': node['homework_name'],
  105. 'start_point': 1,
  106. })
  107. node_item = crud.create_jm_job_node(db,node_item)
  108. node_id = node_item.id
  109. uuid_node_id.update({uuid:node_id})
  110. if nodes is None or len(nodes) == 0:
  111. return
  112. for edge in edges:
  113. edge_item = models.JmJobEdge(**{
  114. 'job_id': job_id,
  115. 'in_node_id': uuid_node_id[edge['source']],
  116. 'out_node_id': uuid_node_id[edge['target']]
  117. })
  118. edge = crud.create_jm_job_edge(db,edge_item)
  119. return
  120. def update_jm_job_status_services(db: Session, job_id: int, status: int):
  121. if status == 1:
  122. requirements_status = get_requirements_status_by_job_id(db,job_id)
  123. if not requirements_status:
  124. raise Exception('依赖未安装完成,不可开启')
  125. job_relation = crud.get_af_id(db,job_id,'job')
  126. on_off_control(job_relation.af_id, status)
  127. return crud.update_jm_job_status(db,job_id,status)
  128. def execute_job_services(db: Session, jm_job_id: int):
  129. relation = crud.get_af_id(db, jm_job_id, 'job')
  130. res = execute_job(relation.af_id)
  131. return res
  132. def get_requirements_status_by_job_id(db: Session, job_id: int):
  133. nodes = crud.get_one_job_nodes(db, job_id)
  134. homeworks = crud.get_jm_homeworks_by_ids(db, [node.homework_id for node in nodes])
  135. for homework in homeworks:
  136. relation = crud.get_requirements_status(db, homework.dag_uuid) if homework.type == "Dag" else None
  137. if relation and relation.status != 2:
  138. return False
  139. return True