jm_job_info.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import json
  2. import re
  3. import time
  4. from sqlalchemy.orm import Session
  5. from app import models, schemas
  6. from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
  7. from app.utils.cron_utils import joint_cron_expression
  8. import app.crud as crud
  9. from app.utils.send_util import get_job_run_status, get_task_log
  10. from configs.globals import g
  11. from constants.constants import RUN_STATUS
  12. def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
  13. create_time = int(time.time())
  14. jm_job_info_create = item.dict()
  15. # 定时对象转为cron表达式
  16. cron_expression_item = jm_job_info_create.pop('cron_expression', None)
  17. if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
  18. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  19. cron_select_type = cron_expression_item["cron_select_type"]
  20. jm_job_info_create.update({
  21. 'cron_select_type': cron_select_type,
  22. 'cron_expression': cron_expression,
  23. })
  24. # 节点与边的剥离
  25. nodes = jm_job_info_create.pop('nodes', None)
  26. edges = jm_job_info_create.pop('edges', None)
  27. db_item = db.query(models.JmJobInfo)\
  28. .filter(models.JmJobInfo.name == jm_job_info_create['name'])\
  29. .filter(models.JmJobInfo.project_id == g.project_id)\
  30. .filter(models.JmJobInfo.delete_status != 0).first()
  31. if db_item:
  32. raise Exception('定时任务名称已存在')
  33. # 标签的存储
  34. tag = jm_job_info_create['tag']
  35. crud.find_and_update(db, '任务标签', tag)
  36. jm_job_info_dict = {}
  37. jm_job_info_dict.update(jm_job_info_create)
  38. jm_job_info_dict.update({
  39. 'status': 0,
  40. 'delete_status': 1,
  41. 'create_time': create_time,
  42. 'update_time': create_time,
  43. 'user_id': g.user_id,
  44. 'project_id': g.project_id
  45. })
  46. jm_job_info = models.JmJobInfo(**jm_job_info_dict)
  47. # 创建airflow端任务
  48. af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
  49. # 创建local端任务
  50. jm_job_info = crud.create_jm_job_info(db,jm_job_info)
  51. # 创建多作业节点与节点关系
  52. create_jm_job_node(db, nodes, edges, jm_job_info.id)
  53. # 创建关系
  54. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  55. return jm_job_info
  56. def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
  57. jm_job_info_update = item.dict(exclude_unset=True)
  58. # 定时对象转为cron表达式
  59. cron_expression_item = jm_job_info_update.pop('cron_expression', None)
  60. if jm_job_info_update['cron_type'] == 2:
  61. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  62. cron_select_type = cron_expression_item["cron_select_type"]
  63. jm_job_info_update.update({
  64. 'cron_select_type': cron_select_type,
  65. 'cron_expression': cron_expression,
  66. })
  67. # 节点与边的剥离
  68. nodes = jm_job_info_update.pop('nodes', None)
  69. edges = jm_job_info_update.pop('edges', None)
  70. db_item = db.query(models.JmJobInfo)\
  71. .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
  72. if not db_item:
  73. raise Exception('未找到该定时任务')
  74. db_name_item = db.query(models.JmJobInfo)\
  75. .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
  76. .filter(models.JmJobInfo.project_id == g.project_id)\
  77. .filter(models.JmJobInfo.delete_status != 0)\
  78. .filter(models.JmJobInfo.id != item.id).first()
  79. if db_name_item:
  80. raise Exception('定时任务名称已存在')
  81. # 标签的存储
  82. tag = jm_job_info_update['tag']
  83. crud.find_and_update(db, '任务标签', tag)
  84. jm_job_info_update.update({
  85. 'update_time': int(time.time()),
  86. 'user_id': g.user_id,
  87. 'project_id': g.project_id
  88. })
  89. for k, v in jm_job_info_update.items():
  90. setattr(db_item, k, v)
  91. # 修改airflow端任务
  92. af_job = jm_job_update_job(db_item,nodes,edges,db)
  93. # 修改local端任务
  94. db_item = crud.update_jm_job_info(db,db_item)
  95. # 删除之前的作业节点并创建新作业节点与节点关系
  96. crud.delete_job_node(db, db_item.id)
  97. create_jm_job_node(db, nodes, edges, db_item.id)
  98. return db_item
  99. def create_jm_job_node(db: Session, nodes, edges, job_id):
  100. uuid_node_id = {}
  101. if nodes is None or len(nodes) == 0:
  102. return
  103. for node in nodes:
  104. uuid = node['id']
  105. node_item = models.JmJobNode(**{
  106. 'job_id': job_id,
  107. 'homework_id': node['homework_id'],
  108. 'homework_name': node['homework_name'],
  109. 'start_point': 1,
  110. })
  111. node_item = crud.create_jm_job_node(db,node_item)
  112. node_id = node_item.id
  113. uuid_node_id.update({uuid:node_id})
  114. if nodes is None or len(nodes) == 0:
  115. return
  116. for edge in edges:
  117. edge_item = models.JmJobEdge(**{
  118. 'job_id': job_id,
  119. 'in_node_id': uuid_node_id[edge['source']],
  120. 'out_node_id': uuid_node_id[edge['target']]
  121. })
  122. edge = crud.create_jm_job_edge(db,edge_item)
  123. return
  124. def update_jm_job_status_services(db: Session, job_id: int, status: int):
  125. if status == 1:
  126. requirements_status = get_requirements_status_by_job_id(db,job_id)
  127. if not requirements_status:
  128. raise Exception('依赖未安装完成,不可开启')
  129. job_relation = crud.get_af_id(db,job_id,'job')
  130. on_off_control(job_relation.af_id, status)
  131. return crud.update_jm_job_status(db,job_id,status)
  132. def execute_job_services(db: Session, jm_job_id: int):
  133. relation = crud.get_af_id(db, jm_job_id, 'job')
  134. res = execute_job(relation.af_id)
  135. return res
  136. def get_requirements_status_by_job_id(db: Session, job_id: int):
  137. nodes = crud.get_one_job_nodes(db, job_id)
  138. homeworks = crud.get_jm_homeworks_by_ids(db, [node.homework_id for node in nodes])
  139. for homework in homeworks:
  140. relation = crud.get_requirements_status(db, homework.dag_uuid) if homework.type == "Dag" else None
  141. if relation and relation.status != 2:
  142. return False
  143. return True
  144. def get_all_timeout_jobs_services(db: Session, timeout: int):
  145. current_time = int(time.time()) - (timeout * 3600)
  146. timeout_job = []
  147. current_time = int(time.time())
  148. af_runs = crud.get_all_running_airflow_runs(db, current_time)
  149. for af_run in af_runs:
  150. run_status = af_run.status
  151. if run_status not in [2,3]:
  152. res = get_job_run_status(str(af_run.id))
  153. run_status = res['data']['status']
  154. if run_status not in [-1,2,3]:
  155. af_job = crud.get_airflow_job_once(db, af_run.job_id)
  156. tasks = af_job.tasks
  157. for task in tasks:
  158. if task['task_type'] == 'sparks':
  159. for node in json.loads(task['script'])['sub_nodes']:
  160. task_id = f'{task["id"]}_{node["id"]}'
  161. log_res = get_task_log(str(af_job.id),af_run.af_run_id,task_id)
  162. r_id = parsing_log_data_application_id(log_res['data'])
  163. if r_id: timeout_job.append({'application_id':r_id, 'start_time':af_run.start_time, 'desc': af_job.desc})
  164. else:
  165. task_id = task["id"]
  166. log_res = get_task_log(str(af_job.id),af_run.af_run_id,task_id)
  167. r_id = parsing_log_data_application_id(log_res['data'])
  168. if r_id: timeout_job.append({'application_id':r_id, 'start_time':af_run.start_time, 'desc': af_job.desc})
  169. return timeout_job
  170. def parsing_log_data_application_id(log_data):
  171. application_id = None
  172. status = log_data['status'] if 'status' in log_data.keys() else None
  173. log = log_data['log'] if 'log' in log_data.keys() else None
  174. if status and log and RUN_STATUS[status] in [0,1]:
  175. res = re.search(r'(application_[0-9]{1,20}_[0-9]{1,20})',log)
  176. if res: application_id = res.group()
  177. return application_id