import json import re import time from sqlalchemy.orm import Session from app import models, schemas from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control from app.utils.cron_utils import joint_cron_expression from app.utils.send_util import get_job_run_status, get_task_log import app.crud as crud from configs.globals import g from constants.constants import RUN_STATUS def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate): create_time = int(time.time()) jm_job_info_create = item.dict() # 定时对象转为cron表达式 cron_expression_item = jm_job_info_create.pop('cron_expression', None) if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None: cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item)) cron_select_type = cron_expression_item["cron_select_type"] jm_job_info_create.update({ 'cron_select_type': cron_select_type, 'cron_expression': cron_expression, }) # 节点与边的剥离 nodes = jm_job_info_create.pop('nodes', None) edges = jm_job_info_create.pop('edges', None) db_item = db.query(models.JmJobInfo)\ .filter(models.JmJobInfo.name == jm_job_info_create['name'])\ .filter(models.JmJobInfo.project_id == g.project_id)\ .filter(models.JmJobInfo.delete_status != 0).first() if db_item: raise Exception('定时任务名称已存在') # 标签的存储 tag = jm_job_info_create['tag'] crud.find_and_update(db, '任务标签', tag) jm_job_info_dict = {} jm_job_info_dict.update(jm_job_info_create) jm_job_info_dict.update({ 'status': 0, 'delete_status': 1, 'create_time': create_time, 'update_time': create_time, 'user_name': g.user_name, 'user_id': g.user_id, 'project_id': g.project_id }) jm_job_info = models.JmJobInfo(**jm_job_info_dict) # 创建airflow端任务 af_job = jm_job_create_job(jm_job_info,nodes,edges,db) # 创建local端任务 jm_job_info = crud.create_jm_job_info(db,jm_job_info) # 创建多作业节点与节点关系 create_jm_job_node(db, nodes, edges, jm_job_info.id) # 创建关系 crud.create_relation(db, jm_job_info.id,'job', af_job['id']) return jm_job_info def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate): jm_job_info_update = item.dict(exclude_unset=True) # 定时对象转为cron表达式 cron_expression_item = jm_job_info_update.pop('cron_expression', None) if jm_job_info_update['cron_type'] == 2: cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item)) cron_select_type = cron_expression_item["cron_select_type"] jm_job_info_update.update({ 'cron_select_type': cron_select_type, 'cron_expression': cron_expression, }) # 节点与边的剥离 nodes = jm_job_info_update.pop('nodes', None) edges = jm_job_info_update.pop('edges', None) db_item = db.query(models.JmJobInfo)\ .filter(models.JmJobInfo.id == jm_job_info_update['id']).first() if not db_item: raise Exception('未找到该定时任务') db_name_item = db.query(models.JmJobInfo)\ .filter(models.JmJobInfo.name == jm_job_info_update['name'])\ .filter(models.JmJobInfo.project_id == g.project_id)\ .filter(models.JmJobInfo.delete_status != 0)\ .filter(models.JmJobInfo.id != item.id).first() if db_name_item: raise Exception('定时任务名称已存在') # 标签的存储 tag = jm_job_info_update['tag'] crud.find_and_update(db, '任务标签', tag) jm_job_info_update.update({ 'update_time': int(time.time()), 'user_name': g.user_name, 'user_id': g.user_id, 'project_id': g.project_id }) for k, v in jm_job_info_update.items(): setattr(db_item, k, v) # 修改airflow端任务 af_job = jm_job_update_job(db_item,nodes,edges,db) # 修改local端任务 db_item = crud.update_jm_job_info(db,db_item) # 删除之前的作业节点并创建新作业节点与节点关系 crud.delete_job_node(db, db_item.id) create_jm_job_node(db, nodes, edges, db_item.id) return db_item def create_jm_job_node(db: Session, nodes, edges, job_id): uuid_node_id = {} if nodes is None or len(nodes) == 0: return for node in nodes: uuid = node['id'] node_item = models.JmJobNode(**{ 'job_id': job_id, 'homework_id': node['homework_id'], 'homework_name': node['homework_name'], 'start_point': 1, }) node_item = crud.create_jm_job_node(db,node_item) node_id = node_item.id uuid_node_id.update({uuid:node_id}) if nodes is None or len(nodes) == 0: return for edge in edges: edge_item = models.JmJobEdge(**{ 'job_id': job_id, 'in_node_id': uuid_node_id[edge['source']], 'out_node_id': uuid_node_id[edge['target']] }) edge = crud.create_jm_job_edge(db,edge_item) return def update_jm_job_status_services(db: Session, job_id: int, status: int): if status == 1: requirements_status = get_requirements_status_by_job_id(db,job_id) if not requirements_status: raise Exception('依赖未安装完成,不可开启') job_relation = crud.get_af_id(db,job_id,'job') on_off_control(job_relation.af_id, status) return crud.update_jm_job_status(db,job_id,status) def execute_job_services(db: Session, jm_job_id: int): relation = crud.get_af_id(db, jm_job_id, 'job') res = execute_job(relation.af_id) return res def get_requirements_status_by_job_id(db: Session, job_id: int): nodes = crud.get_one_job_nodes(db, job_id) homeworks = crud.get_jm_homeworks_by_ids(db, [node.homework_id for node in nodes]) for homework in homeworks: relation = crud.get_requirements_status(db, homework.dag_uuid) if homework.type == "Dag" else None if relation and relation.status != 2: return False return True def get_all_timeout_jobs_services(db: Session, timeout: int): current_time = int(time.time()) - (timeout * 3600) timeout_job = [] current_time = int(time.time()) af_runs = crud.get_all_running_airflow_runs(db, current_time) for af_run in af_runs: run_status = af_run.status if run_status not in [2,3]: res = get_job_run_status(str(af_run.id)) run_status = res['data']['status'] if run_status not in [-1,2,3]: af_job = crud.get_airflow_job_once(db, af_run.job_id) tasks = af_job.tasks for task in tasks: if task['task_type'] == 'sparks': for node in json.loads(task['script'])['sub_nodes']: task_id = f'{task["id"]}_{node["id"]}' log_res = get_task_log(str(af_job.id),af_run.af_run_id,task_id) r_id = parsing_log_data_application_id(log_res['data']) if r_id: timeout_job.append({'application_id':r_id, 'start_time':af_run.start_time, 'desc': af_job.desc}) else: task_id = task["id"] log_res = get_task_log(str(af_job.id),af_run.af_run_id,task_id) r_id = parsing_log_data_application_id(log_res['data']) if r_id: timeout_job.append({'application_id':r_id, 'start_time':af_run.start_time, 'desc': af_job.desc}) return timeout_job def parsing_log_data_application_id(log_data): application_id = None status = log_data['status'] if 'status' in log_data.keys() else None log = log_data['log'] if 'log' in log_data.keys() else None if status and log and RUN_STATUS[status] in [0,1]: res = re.search(r'(application_[0-9]{1,20}_[0-9]{1,20})',log) if res: application_id = res.group() return application_id