jm_job_info.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import json
  2. import re
  3. import time
  4. from sqlalchemy.orm import Session
  5. from app import models, schemas
  6. from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
  7. from app.utils.cron_utils import joint_cron_expression
  8. from app.utils.send_util import get_job_run_status, get_task_log
  9. import app.crud as crud
  10. from configs.globals import g
  11. from constants.constants import RUN_STATUS
  12. def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
  13. create_time = int(time.time())
  14. jm_job_info_create = item.dict()
  15. # 定时对象转为cron表达式
  16. cron_expression_item = jm_job_info_create.pop('cron_expression', None)
  17. if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
  18. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  19. cron_select_type = cron_expression_item["cron_select_type"]
  20. jm_job_info_create.update({
  21. 'cron_select_type': cron_select_type,
  22. 'cron_expression': cron_expression,
  23. })
  24. # 节点与边的剥离
  25. nodes = jm_job_info_create.pop('nodes', None)
  26. edges = jm_job_info_create.pop('edges', None)
  27. db_item = db.query(models.JmJobInfo)\
  28. .filter(models.JmJobInfo.name == jm_job_info_create['name'])\
  29. .filter(models.JmJobInfo.project_id == g.project_id)\
  30. .filter(models.JmJobInfo.delete_status != 0).first()
  31. if db_item:
  32. raise Exception('定时任务名称已存在')
  33. # 标签的存储
  34. tag = jm_job_info_create['tag']
  35. crud.find_and_update(db, '任务标签', tag)
  36. jm_job_info_dict = {}
  37. jm_job_info_dict.update(jm_job_info_create)
  38. jm_job_info_dict.update({
  39. 'status': 0,
  40. 'delete_status': 1,
  41. 'create_time': create_time,
  42. 'update_time': create_time,
  43. 'user_name': g.user_name,
  44. 'user_id': g.user_id,
  45. 'project_id': g.project_id
  46. })
  47. jm_job_info = models.JmJobInfo(**jm_job_info_dict)
  48. # 创建airflow端任务
  49. af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
  50. # 创建local端任务
  51. jm_job_info = crud.create_jm_job_info(db,jm_job_info)
  52. # 创建多作业节点与节点关系
  53. create_jm_job_node(db, nodes, edges, jm_job_info.id)
  54. # 创建关系
  55. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  56. return jm_job_info
  57. def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
  58. jm_job_info_update = item.dict(exclude_unset=True)
  59. # 定时对象转为cron表达式
  60. cron_expression_item = jm_job_info_update.pop('cron_expression', None)
  61. if jm_job_info_update['cron_type'] == 2:
  62. cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
  63. cron_select_type = cron_expression_item["cron_select_type"]
  64. jm_job_info_update.update({
  65. 'cron_select_type': cron_select_type,
  66. 'cron_expression': cron_expression,
  67. })
  68. # 节点与边的剥离
  69. nodes = jm_job_info_update.pop('nodes', None)
  70. edges = jm_job_info_update.pop('edges', None)
  71. db_item = db.query(models.JmJobInfo)\
  72. .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
  73. if not db_item:
  74. raise Exception('未找到该定时任务')
  75. db_name_item = db.query(models.JmJobInfo)\
  76. .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
  77. .filter(models.JmJobInfo.project_id == g.project_id)\
  78. .filter(models.JmJobInfo.delete_status != 0)\
  79. .filter(models.JmJobInfo.id != item.id).first()
  80. if db_name_item:
  81. raise Exception('定时任务名称已存在')
  82. # 标签的存储
  83. tag = jm_job_info_update['tag']
  84. crud.find_and_update(db, '任务标签', tag)
  85. jm_job_info_update.update({
  86. 'update_time': int(time.time()),
  87. 'user_name': g.user_name,
  88. 'user_id': g.user_id,
  89. 'project_id': g.project_id
  90. })
  91. for k, v in jm_job_info_update.items():
  92. setattr(db_item, k, v)
  93. # 修改airflow端任务
  94. af_job = jm_job_update_job(db_item,nodes,edges,db)
  95. # 修改local端任务
  96. db_item = crud.update_jm_job_info(db,db_item)
  97. # 删除之前的作业节点并创建新作业节点与节点关系
  98. crud.delete_job_node(db, db_item.id)
  99. create_jm_job_node(db, nodes, edges, db_item.id)
  100. return db_item
  101. def create_jm_job_node(db: Session, nodes, edges, job_id):
  102. uuid_node_id = {}
  103. if nodes is None or len(nodes) == 0:
  104. return
  105. for node in nodes:
  106. uuid = node['id']
  107. node_item = models.JmJobNode(**{
  108. 'job_id': job_id,
  109. 'homework_id': node['homework_id'],
  110. 'homework_name': node['homework_name'],
  111. 'start_point': 1,
  112. })
  113. node_item = crud.create_jm_job_node(db,node_item)
  114. node_id = node_item.id
  115. uuid_node_id.update({uuid:node_id})
  116. if nodes is None or len(nodes) == 0:
  117. return
  118. for edge in edges:
  119. edge_item = models.JmJobEdge(**{
  120. 'job_id': job_id,
  121. 'in_node_id': uuid_node_id[edge['source']],
  122. 'out_node_id': uuid_node_id[edge['target']]
  123. })
  124. edge = crud.create_jm_job_edge(db,edge_item)
  125. return
  126. def update_jm_job_status_services(db: Session, job_id: int, status: int):
  127. if status == 1:
  128. requirements_status = get_requirements_status_by_job_id(db,job_id)
  129. if not requirements_status:
  130. raise Exception('依赖未安装完成,不可开启')
  131. job_relation = crud.get_af_id(db,job_id,'job')
  132. on_off_control(job_relation.af_id, status)
  133. return crud.update_jm_job_status(db,job_id,status)
  134. def execute_job_services(db: Session, jm_job_id: int):
  135. relation = crud.get_af_id(db, jm_job_id, 'job')
  136. res = execute_job(relation.af_id)
  137. return res
  138. def get_requirements_status_by_job_id(db: Session, job_id: int):
  139. nodes = crud.get_one_job_nodes(db, job_id)
  140. homeworks = crud.get_jm_homeworks_by_ids(db, [node.homework_id for node in nodes])
  141. for homework in homeworks:
  142. relation = crud.get_requirements_status(db, homework.dag_uuid) if homework.type == "Dag" else None
  143. if relation and relation.status != 2:
  144. return False
  145. return True
  146. def get_all_timeout_jobs_services(db: Session, timeout: int):
  147. current_time = int(time.time()) - (timeout * 3600)
  148. timeout_job = []
  149. current_time = int(time.time())
  150. af_runs = crud.get_all_running_airflow_runs(db, current_time)
  151. for af_run in af_runs:
  152. run_status = af_run.status
  153. if run_status not in [2,3]:
  154. res = get_job_run_status(str(af_run.id))
  155. run_status = res['data']['status']
  156. if run_status not in [-1,2,3]:
  157. af_job = crud.get_airflow_job_once(db, af_run.job_id)
  158. tasks = af_job.tasks
  159. for task in tasks:
  160. if task['task_type'] == 'sparks':
  161. for node in json.loads(task['script'])['sub_nodes']:
  162. task_id = f'{task["id"]}_{node["id"]}'
  163. log_res = get_task_log(str(af_job.id),af_run.af_run_id,task_id)
  164. r_id = parsing_log_data_application_id(log_res['data'])
  165. if r_id: timeout_job.append({'application_id':r_id, 'start_time':af_run.start_time, 'desc': af_job.desc})
  166. else:
  167. task_id = task["id"]
  168. log_res = get_task_log(str(af_job.id),af_run.af_run_id,task_id)
  169. r_id = parsing_log_data_application_id(log_res['data'])
  170. if r_id: timeout_job.append({'application_id':r_id, 'start_time':af_run.start_time, 'desc': af_job.desc})
  171. return timeout_job
  172. def parsing_log_data_application_id(log_data):
  173. application_id = None
  174. status = log_data['status'] if 'status' in log_data.keys() else None
  175. log = log_data['log'] if 'log' in log_data.keys() else None
  176. if status and log and RUN_STATUS[status] in [0,1]:
  177. res = re.search(r'(application_[0-9]{1,20}_[0-9]{1,20})',log)
  178. if res: application_id = res.group()
  179. return application_id