from asyncio import current_task import json import time from turtle import update from app import crud, models, schemas from app.common import minio from app.core.datasource.datasource import DataSourceBase from app.crud.jm_homework_datasource_relation import get_jm_relations from app.utils.send_util import * from sqlalchemy.orm import Session from app.common.minio import minio_client from configs.settings import DefaultOption, config DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME') requirement_path = config.get('REQUIREMENTS_CONFIG', 'path') requirement_prefix = config.get('REQUIREMENTS_CONFIG', 'prefix') type_dict = { "Java": "java", "Python": "python", "Dag": "sparks" } def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Session): content = '' envs = {} if jm_homework.type == "Dag": content = red_dag_and_format(jm_homework, relation_list, db) requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid) if requirements_relation: envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{jm_homework.dag_uuid.lower()}.zip'}) elif jm_homework.type == "Python": content = red_python_and_format(jm_homework) af_task = { "name": jm_homework.name, "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file], "script": content if jm_homework.type != "Java" else "", "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "", "cmd_parameters": "", "envs": envs, "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "", "task_type": type_dict[jm_homework.type], "user_id": 0, } res = send_post('/af/af_task', af_task) af_task = res['data'] return af_task def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Session): relation = crud.get_af_id(db, jm_homework.id, 'task') content = '' envs = {} if jm_homework.type == "Dag": content = red_dag_and_format(jm_homework, relation_list, db) requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid) if requirements_relation: envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{jm_homework.dag_uuid.lower()}.zip'}) elif jm_homework.type == "Python": content = red_python_and_format(jm_homework) af_task = { "name": jm_homework.name, "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file], "script": content if jm_homework.type != "Java" else "", "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "", "cmd_parameters": "", "envs": envs, "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "", "task_type": type_dict[jm_homework.type], "user_id": 0, } res = send_put('/af/af_task', relation.af_id, af_task) af_task = res['data'] return af_task def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session): homework_ids = [node['homework_id'] for node in nodes] node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes} relations = crud.get_af_ids(db,homework_ids, 'task') se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations} tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()] dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges] cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once' cron = cron.replace('?','*') af_job = { "tasks": tasks, "name": jm_job_info.name, "dependence": dependence, "cron": cron, "desc": jm_job_info.name, "route_strategy": "", "block_strategy": "", "executor_timeout": 0, "executor_fail_retry_count": 0, "trigger_status": jm_job_info.status, "job_mode":1, "job_type": 0, "user_id": 0, } res = send_post('/af/af_job', af_job) af_job = res['data'] send_submit(af_job['id']) return af_job def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session): homework_ids = [node['homework_id'] for node in nodes] node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes} relations = crud.get_af_ids(db,homework_ids, 'task') se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations} tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()] dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges] cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once' cron = cron.replace('?','*') af_job = { "tasks": tasks, "name": jm_job_info.name, "dependence": dependence, "cron": cron, "desc": jm_job_info.name, "route_strategy": "", "block_strategy": "", "executor_timeout": 0, "executor_fail_retry_count": 0, "trigger_status": jm_job_info.status, } job_relation = crud.get_af_id(db,jm_job_info.id,'job') res = send_put('/af/af_job', job_relation.af_id, af_job) af_job = res['data'] send_submit(af_job['id']) def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Session): node_relation_dict = { relation['node_uuid']:relation for relation in relation_list} file = minio_client.get_file(jm_homework.dag_url) result = json.loads(file) edges = result['edges'] # t_s = {} # input_num = {} # for edge in edges: # if edge['target'] in t_s.keys(): # t_s[edge['target']].append(edge['source']) # else: # t_s.update({edge['target']:[edge['source']]}) nodes = result['nodes'] sub_nodes = [] for node in nodes: if node['op'] == 'datasource': fileds = node['data']['input_source'] script = 'select ' for filed in fileds: script += filed['dataField'] + ',' script = script.strip(',') ds_id = node_relation_dict[node['id']]['datasource_id'] database_name = "" if ds_id == -1: database_name = DATABASE_NAME else: data_source = crud.get_job_jdbc_datasource(db,ds_id) database_name = data_source.database_name script += ' from `' + database_name + '`.`'+node_relation_dict[node['id']]['table']+ '`' sub_node = { "id": node['id'], "name": node['name'], "op": 'sql', "script":script } sub_nodes.append(sub_node) elif node['op'] == 'outputsource': fileds = node['data']['output_source'] ds_id = node_relation_dict[node['id']]['datasource_id'] database_name = "" if ds_id == -1: database_name = DATABASE_NAME else: data_source = crud.get_job_jdbc_datasource(db,ds_id) database_name = data_source.database_name script = '''def main_func (input0, spark,sc): input0.write.format("orc").mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']]['table']+'''")''' inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {} # index = 0 # input_list = t_s[node['id']] if node['id'] in t_s.keys() else [] # for input in input_list: # if input in input_num.keys(): # input_num[input]+=1 # else: # input_num.update({input:0}) # inputs.update({'input'+str(index):[input,input_num[input]]}) # index+=1 sub_node = { "id": node['id'], "name": node['name'], "op": 'pyspark', "inputs": inputs, "script":script } sub_nodes.append(sub_node) else: inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {} # index = 0 # input_list = t_s[node['id']] if node['id'] in t_s.keys() else [] # for input in input_list: # if input in input_num.keys(): # input_num[input]+=1 # else: # input_num.update({input:0}) # inputs.update({'input'+str(index):[input,input_num[input]]}) # index+=1 script = node['data']['script'] if node['op'] == 'sql': script = script.replace('\n', ' ') sub_node = { "id": node['id'], "name": node['name'], "op": node['op'], "inputs": inputs, "script": script, } sub_nodes.append(sub_node) res = { 'sub_nodes': sub_nodes, 'edges': [(edge['source'],edge['target']) for edge in edges] } return json.dumps(res) def red_python_and_format(jm_homework): file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py") return file.decode("utf-8") def on_off_control(af_job_id: int,status: int): for i in range(0,11): parsed_res = get_job_last_parsed_time(af_job_id) last_parsed_time = parsed_res['data']['last_parsed_time'] if last_parsed_time: send_pause(af_job_id,status) print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}") break if i >= 10: raise Exception(f"{af_job_id}==>状态修改失败") time.sleep(2) def execute_job(af_job_id: int): res = get_job_last_parsed_time(af_job_id) current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None send_submit(af_job_id) for i in range(0,21): parsed_res = get_job_last_parsed_time(af_job_id) last_parsed_time = parsed_res['data']['last_parsed_time'] if last_parsed_time and last_parsed_time != current_time: res = send_execute(af_job_id) print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}") return res if i >= 20: raise Exception(f"{af_job_id}==>文件正在转化中") time.sleep(2)