123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- import json
- from turtle import update
- from app import crud, models
- from app.crud.jm_homework_datasource_relation import get_jm_relations
- from app.utils.send_util import *
- from app.utils.utils import get_cmd_parameter
- from sqlalchemy.orm import Session
- from app.common.minio import FileHandler
- type_dict = {
- "Java": "java",
- "Python": "python",
- "Dag": "sparks"
- }
- def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
- content = ''
- if jm_homework.type == "Dag":
- content = red_dag_and_format(jm_homework, db)
- elif jm_homework.type == "Python":
- content = red_python_and_format(jm_homework)
- af_task = {
- "name": jm_homework.name,
- "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
- "script": content if jm_homework.type != "Java" else "",
- "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
- "cmd_parameters": "",
- "envs": {},
- "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
- "task_type": type_dict[jm_homework.type],
- "user_id": 0,
- }
- res = send_post('/jpt/af_task', af_task)
- af_task = res['data']
- crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
- return af_task
- def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
- relation = crud.get_af_id(db, jm_homework.id, 'task')
- content = ''
- if jm_homework.type == "Dag":
- content = content = red_dag_and_format(jm_homework, db)
- elif jm_homework.type == "Python":
- content = red_python_and_format(jm_homework)
- af_task = {
- "name": jm_homework.name,
- "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
- "script": content if jm_homework.type != "Java" else "",
- "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
- "cmd_parameters": "",
- "envs": {},
- "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
- "task_type": type_dict[jm_homework.type],
- "user_id": 0,
- }
- res = send_put('/jpt/af_task', relation.af_id, af_task)
- af_task = res['data']
- return af_task
- def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
- nodes = crud.get_one_job_nodes(db, jm_job_info.id)
- homework_ids = [node.homework_id for node in nodes]
- relations = crud.get_af_ids(db,homework_ids, 'task')
- se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
- tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
- edges = crud.get_one_job_edges(db, jm_job_info.id)
- dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
- af_job = {
- "tasks": tasks,
- "name": jm_job_info.name,
- "dependence": dependence,
- "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else 'onec',
- "desc": jm_job_info.name,
- "route_strategy": "",
- "block_strategy": "",
- "executor_timeout": 0,
- "executor_fail_retry_count": 0,
- "trigger_status": jm_job_info.status,
- "job_mode":1,
- "job_type": 0,
- "user_id": 0,
- }
- res = send_post('/jpt/af_job', af_job)
- af_job = res['data']
- crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
- send_submit(af_job['id'])
- def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
- nodes = crud.get_one_job_nodes(db, jm_job_info.id)
- homework_ids = [node.homework_id for node in nodes]
- relations = crud.get_af_ids(db,homework_ids, 'task')
- se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
- tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
- edges = crud.get_one_job_edges(db, jm_job_info.id)
- dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
- af_job = {
- "tasks": tasks,
- "name": jm_job_info.name,
- "dependence": dependence,
- "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else 'onec',
- "desc": jm_job_info.name,
- "route_strategy": "",
- "block_strategy": "",
- "executor_timeout": 0,
- "executor_fail_retry_count": 0,
- "trigger_status": jm_job_info.status,
- }
- job_relation = crud.get_af_id(db,jm_job_info.id,'job')
- res = send_put('/jpt/af_job', job_relation.af_id, af_job)
- af_job = res['data']
- send_submit(af_job['id'])
- def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
- job_relation = crud.get_af_id(db,jm_job_info.id,'job')
- if job_relation is None:
- jm_job_create_job(jm_job_info, db)
- else:
- jm_job_update_job(jm_job_info, db)
- def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
- relations = get_jm_relations(db,jm_homework.id)
- node_relation_dict = { relation.node_uuid:relation for relation in relations}
- f = open('./dag' + jm_homework.dag_url)
- lines = f.read()
- result = json.loads(lines)
- f.close()
- edges = result['edges']
- t_s = {}
- input_num = {}
- for edge in edges:
- if edge['target'] in t_s.keys():
- t_s[edge['target']].append(edge['source'])
- else:
- t_s.update({edge['target']:[edge['source']]})
- nodes = result['nodes']
- sub_nodes = []
- for node in nodes:
- if node['op'] == 'datasource':
- fileds = node['data']['input_source']
- script = 'select '
- for filed in fileds:
- script += filed['dataField'] + ','
- script = script.strip(',')
- script += ' from ' + node_relation_dict[node['id']].table
- sub_node = {
- "id": node['id'],
- "name": node['name'],
- "op": 'sql',
- "script":script
- }
- sub_nodes.append(sub_node)
- elif node['op'] == 'outputsource':
- fileds = node['data']['output_source']
- script = 'select '
- for filed in fileds:
- script += filed['dataField'] + ','
- script = script.strip(',')
- script += ' from ' + node_relation_dict[node['id']].table
- inputs = {}
- index = 0
- input_list = t_s[node['id']]
- for input in input_list:
- if input in input_num.keys():
- input_num[input]+=1
- else:
- input_num.update({input:0})
- inputs.update({'input'+str(index):[input,input_num[input]]})
- index+=1
- sub_node = {
- "id": node['id'],
- "name": node['name'],
- "op": 'sql',
- "inputs": inputs,
- "script":script
- }
- sub_nodes.append(sub_node)
- else:
- inputs = {}
- index = 0
- input_list = t_s[node['id']]
- for input in input_list:
- if input in input_num.keys():
- input_num[input]+=1
- else:
- input_num.update({input:0})
- inputs.update({'input'+str(index):[input,input_num[input]]})
- index+=1
- sub_node = {
- "id": node['id'],
- "name": node['name'],
- "op": node['op'],
- "inputs": inputs,
- "script": node['data']['script'],
- }
- sub_nodes.append(sub_node)
- res = {
- 'sub_nodes': sub_nodes,
- 'edges': [(edge['source'],edge['target']) for edge in edges]
- }
- return json.dumps(res)
- def red_python_and_format(jm_homework):
- file_handler = FileHandler("datax")
- file = file_handler.get_file("/python/test.py")
- return file.decode("utf-8")
|