123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- import json
- import re
- import time
- from sqlalchemy.orm import Session
- from app import models, schemas
- from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
- from app.utils.cron_utils import joint_cron_expression
- from app.utils.send_util import get_job_run_status, get_task_log
- import app.crud as crud
- from configs.globals import g
- from constants.constants import RUN_STATUS
- def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
- create_time = int(time.time())
- jm_job_info_create = item.dict()
- # 定时对象转为cron表达式
- cron_expression_item = jm_job_info_create.pop('cron_expression', None)
- if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
- cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
- cron_select_type = cron_expression_item["cron_select_type"]
- jm_job_info_create.update({
- 'cron_select_type': cron_select_type,
- 'cron_expression': cron_expression,
- })
- # 节点与边的剥离
- nodes = jm_job_info_create.pop('nodes', None)
- edges = jm_job_info_create.pop('edges', None)
- db_item = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.name == jm_job_info_create['name'])\
- .filter(models.JmJobInfo.project_id == g.project_id)\
- .filter(models.JmJobInfo.delete_status != 0).first()
- if db_item:
- raise Exception('定时任务名称已存在')
- # 标签的存储
- tag = jm_job_info_create['tag']
- crud.find_and_update(db, '任务标签', tag)
- jm_job_info_dict = {}
- jm_job_info_dict.update(jm_job_info_create)
- jm_job_info_dict.update({
- 'status': 0,
- 'delete_status': 1,
- 'create_time': create_time,
- 'update_time': create_time,
- 'user_name': g.user_name,
- 'user_id': g.user_id,
- 'project_id': g.project_id
- })
- jm_job_info = models.JmJobInfo(**jm_job_info_dict)
- # 创建airflow端任务
- af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
- # 创建local端任务
- jm_job_info = crud.create_jm_job_info(db,jm_job_info)
- # 创建多作业节点与节点关系
- create_jm_job_node(db, nodes, edges, jm_job_info.id)
- # 创建关系
- crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
- return jm_job_info
- def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
- jm_job_info_update = item.dict(exclude_unset=True)
- # 定时对象转为cron表达式
- cron_expression_item = jm_job_info_update.pop('cron_expression', None)
- if jm_job_info_update['cron_type'] == 2:
- cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
- cron_select_type = cron_expression_item["cron_select_type"]
- jm_job_info_update.update({
- 'cron_select_type': cron_select_type,
- 'cron_expression': cron_expression,
- })
- # 节点与边的剥离
- nodes = jm_job_info_update.pop('nodes', None)
- edges = jm_job_info_update.pop('edges', None)
- db_item = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
- if not db_item:
- raise Exception('未找到该定时任务')
- db_name_item = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
- .filter(models.JmJobInfo.project_id == g.project_id)\
- .filter(models.JmJobInfo.delete_status != 0)\
- .filter(models.JmJobInfo.id != item.id).first()
- if db_name_item:
- raise Exception('定时任务名称已存在')
- # 标签的存储
- tag = jm_job_info_update['tag']
- crud.find_and_update(db, '任务标签', tag)
- jm_job_info_update.update({
- 'update_time': int(time.time()),
- 'user_name': g.user_name,
- 'user_id': g.user_id,
- 'project_id': g.project_id
- })
- for k, v in jm_job_info_update.items():
- setattr(db_item, k, v)
- # 修改airflow端任务
- af_job = jm_job_update_job(db_item,nodes,edges,db)
- # 修改local端任务
- db_item = crud.update_jm_job_info(db,db_item)
- # 删除之前的作业节点并创建新作业节点与节点关系
- crud.delete_job_node(db, db_item.id)
- create_jm_job_node(db, nodes, edges, db_item.id)
- return db_item
- def create_jm_job_node(db: Session, nodes, edges, job_id):
- uuid_node_id = {}
- if nodes is None or len(nodes) == 0:
- return
- for node in nodes:
- uuid = node['id']
- node_item = models.JmJobNode(**{
- 'job_id': job_id,
- 'homework_id': node['homework_id'],
- 'homework_name': node['homework_name'],
- 'start_point': 1,
- })
- node_item = crud.create_jm_job_node(db,node_item)
- node_id = node_item.id
- uuid_node_id.update({uuid:node_id})
- if nodes is None or len(nodes) == 0:
- return
- for edge in edges:
- edge_item = models.JmJobEdge(**{
- 'job_id': job_id,
- 'in_node_id': uuid_node_id[edge['source']],
- 'out_node_id': uuid_node_id[edge['target']]
- })
- edge = crud.create_jm_job_edge(db,edge_item)
- return
- def update_jm_job_status_services(db: Session, job_id: int, status: int):
- if status == 1:
- requirements_status = get_requirements_status_by_job_id(db,job_id)
- if not requirements_status:
- raise Exception('依赖未安装完成,不可开启')
- job_relation = crud.get_af_id(db,job_id,'job')
- on_off_control(job_relation.af_id, status)
- return crud.update_jm_job_status(db,job_id,status)
- def execute_job_services(db: Session, jm_job_id: int):
- relation = crud.get_af_id(db, jm_job_id, 'job')
- res = execute_job(relation.af_id)
- return res
- def get_requirements_status_by_job_id(db: Session, job_id: int):
- nodes = crud.get_one_job_nodes(db, job_id)
- homeworks = crud.get_jm_homeworks_by_ids(db, [node.homework_id for node in nodes])
- for homework in homeworks:
- relation = crud.get_requirements_status(db, homework.dag_uuid) if homework.type == "Dag" else None
- if relation and relation.status != 2:
- return False
- return True
- def get_all_timeout_jobs_services(db: Session, timeout: int):
- current_time = int(time.time()) - (timeout * 3600)
- timeout_job = []
- current_time = int(time.time())
- af_runs = crud.get_all_running_airflow_runs(db, current_time)
- for af_run in af_runs:
- run_status = af_run.status
- if run_status not in [2,3]:
- res = get_job_run_status(str(af_run.id))
- run_status = res['data']['status']
- if run_status not in [-1,2,3]:
- af_job = crud.get_airflow_job_once(db, af_run.job_id)
- tasks = af_job.tasks
- for task in tasks:
- if task['task_type'] == 'sparks':
- for node in json.loads(task['script'])['sub_nodes']:
- task_id = f'{task["id"]}_{node["id"]}'
- log_res = get_task_log(str(af_job.id),af_run.af_run_id,task_id)
- r_id = parsing_log_data_application_id(log_res['data'])
- if r_id: timeout_job.append({'application_id':r_id, 'start_time':af_run.start_time, 'desc': af_job.desc})
- else:
- task_id = task["id"]
- log_res = get_task_log(str(af_job.id),af_run.af_run_id,task_id)
- r_id = parsing_log_data_application_id(log_res['data'])
- if r_id: timeout_job.append({'application_id':r_id, 'start_time':af_run.start_time, 'desc': af_job.desc})
- return timeout_job
- def parsing_log_data_application_id(log_data):
- application_id = None
- status = log_data['status'] if 'status' in log_data.keys() else None
- log = log_data['log'] if 'log' in log_data.keys() else None
- if status and log and RUN_STATUS[status] in [0,1]:
- res = re.search(r'(application_[0-9]{1,20}_[0-9]{1,20})',log)
- if res: application_id = res.group()
- return application_id
|