import json from turtle import update from app import crud, models from app.crud.jm_homework_datasource_relation import get_jm_relations from app.utils.send_util import * from app.utils.utils import get_cmd_parameter from sqlalchemy.orm import Session from app.common.minio import FileHandler type_dict = { "Java": "java", "Python": "python", "Dag": "sparks" } def jm_job_create_task(jm_homework: models.JmHomework, db: Session): content = '' if jm_homework.type == "Dag": content = red_dag_and_format(jm_homework, db) elif jm_homework.type == "Python": content = red_python_and_format(jm_homework) af_task = { "name": jm_homework.name, "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file], "script": content if jm_homework.type != "Java" else "", "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "", "cmd_parameters": "", "envs": {}, "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "", "task_type": type_dict[jm_homework.type], "user_id": 0, } res = send_post('/jpt/af_task', af_task) af_task = res['data'] crud.create_relation(db ,jm_homework.id, 'task', af_task['id']) return af_task def jm_job_update_task(jm_homework: models.JmHomework, db: Session): relation = crud.get_af_id(db, jm_homework.id, 'task') content = '' if jm_homework.type == "Dag": content = content = red_dag_and_format(jm_homework, db) elif jm_homework.type == "Python": content = red_python_and_format(jm_homework) af_task = { "name": jm_homework.name, "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file], "script": content if jm_homework.type != "Java" else "", "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "", "cmd_parameters": "", "envs": {}, "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "", "task_type": type_dict[jm_homework.type], "user_id": 0, } res = send_put('/jpt/af_task', relation.af_id, af_task) af_task = res['data'] return af_task def jm_homework_submit(jm_homework: models.JmHomework, db: Session): task_relation = crud.get_af_id(db,jm_homework.id,'task') if task_relation is None: jm_job_create_task(jm_homework, db) else: jm_job_update_task(jm_homework, db) def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session): nodes = crud.get_one_job_nodes(db, jm_job_info.id) homework_ids = [node.homework_id for node in nodes] relations = crud.get_af_ids(db,homework_ids, 'task') se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations} tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()] edges = crud.get_one_job_edges(db, jm_job_info.id) dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges] cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once' cron.replace('?','*') af_job = { "tasks": tasks, "name": jm_job_info.name, "dependence": dependence, "cron": cron, "desc": jm_job_info.name, "route_strategy": "", "block_strategy": "", "executor_timeout": 0, "executor_fail_retry_count": 0, "trigger_status": jm_job_info.status, "job_mode":1, "job_type": 0, "user_id": 0, } res = send_post('/jpt/af_job', af_job) af_job = res['data'] crud.create_relation(db, jm_job_info.id,'job', af_job['id']) send_submit(af_job['id']) send_pause(af_job['id'], True if jm_job_info.status == 1 else False) def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session): nodes = crud.get_one_job_nodes(db, jm_job_info.id) homework_ids = [node.homework_id for node in nodes] node_id_to_h_id = {node.id:node.homework_id for node in nodes} relations = crud.get_af_ids(db,homework_ids, 'task') se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations} tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()] edges = crud.get_one_job_edges(db, jm_job_info.id) dependence = [[se_id_to_af_id_dict[node_id_to_h_id[edge.in_node_id]],se_id_to_af_id_dict[node_id_to_h_id[edge.out_node_id]]] for edge in edges] cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once' cron.replace('?','*') af_job = { "tasks": tasks, "name": jm_job_info.name, "dependence": dependence, "cron": cron, "desc": jm_job_info.name, "route_strategy": "", "block_strategy": "", "executor_timeout": 0, "executor_fail_retry_count": 0, "trigger_status": jm_job_info.status, } job_relation = crud.get_af_id(db,jm_job_info.id,'job') res = send_put('/jpt/af_job', job_relation.af_id, af_job) af_job = res['data'] send_submit(af_job['id']) send_pause(af_job['id'], True if jm_job_info.status == 1 else False) def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session): job_relation = crud.get_af_id(db,jm_job_info.id,'job') if job_relation is None: jm_job_create_job(jm_job_info, db) else: jm_job_update_job(jm_job_info, db) def red_dag_and_format(jm_homework: models.JmHomework, db: Session): relations = get_jm_relations(db,jm_homework.id) node_relation_dict = { relation.node_uuid:relation for relation in relations} f = open('./dag' + jm_homework.dag_url) lines = f.read() result = json.loads(lines) f.close() edges = result['edges'] t_s = {} input_num = {} for edge in edges: if edge['target'] in t_s.keys(): t_s[edge['target']].append(edge['source']) else: t_s.update({edge['target']:[edge['source']]}) nodes = result['nodes'] sub_nodes = [] for node in nodes: if node['op'] == 'datasource': fileds = node['data']['input_source'] script = 'select ' for filed in fileds: script += filed['dataField'] + ',' script = script.strip(',') script += ' from ' + node_relation_dict[node['id']].table sub_node = { "id": node['id'], "name": node['name'], "op": 'sql', "script":script } sub_nodes.append(sub_node) elif node['op'] == 'outputsource': fileds = node['data']['output_source'] script = 'select ' for filed in fileds: script += filed['dataField'] + ',' script = script.strip(',') script += ' from ' + node_relation_dict[node['id']].table inputs = {} index = 0 input_list = t_s[node['id']] for input in input_list: if input in input_num.keys(): input_num[input]+=1 else: input_num.update({input:0}) inputs.update({'input'+str(index):[input,input_num[input]]}) index+=1 sub_node = { "id": node['id'], "name": node['name'], "op": 'sql', "inputs": inputs, "script":script } sub_nodes.append(sub_node) else: inputs = {} index = 0 input_list = t_s[node['id']] for input in input_list: if input in input_num.keys(): input_num[input]+=1 else: input_num.update({input:0}) inputs.update({'input'+str(index):[input,input_num[input]]}) index+=1 sub_node = { "id": node['id'], "name": node['name'], "op": node['op'], "inputs": inputs, "script": node['data']['script'], } sub_nodes.append(sub_node) res = { 'sub_nodes': sub_nodes, 'edges': [(edge['source'],edge['target']) for edge in edges] } return json.dumps(res) def red_python_and_format(jm_homework): file_handler = FileHandler("datax") file = file_handler.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py") return file.decode("utf-8")