123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- from asyncio import current_task
- import json
- import time
- from turtle import update
- from app import crud, models, schemas
- from app.common import minio
- from app.core.datasource.datasource import DataSourceBase
- from app.crud.jm_homework_datasource_relation import get_jm_relations
- from app.utils.send_util import *
- from sqlalchemy.orm import Session
- from app.common.minio import minio_client
- from configs.settings import DefaultOption, config
- DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
- requirement_path = config.get('REQUIREMENTS_CONFIG', 'path')
- requirement_prefix = config.get('REQUIREMENTS_CONFIG', 'prefix')
- type_dict = {
- "Java": "java",
- "Python": "python",
- "Dag": "sparks"
- }
- def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Session):
- content = ''
- envs = {}
- if jm_homework.type == "Dag":
- content = red_dag_and_format(jm_homework, relation_list, db)
- requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
- if requirements_relation:
- envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{jm_homework.dag_uuid.lower()}.zip'})
- elif jm_homework.type == "Python":
- content = red_python_and_format(jm_homework)
- af_task = {
- "name": jm_homework.name,
- "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
- "script": content if jm_homework.type != "Java" else "",
- "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
- "cmd_parameters": "",
- "envs": envs,
- "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
- "task_type": type_dict[jm_homework.type],
- "user_id": 0,
- }
- res = send_post('/af/af_task', af_task)
- af_task = res['data']
- return af_task
- def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Session):
- relation = crud.get_af_id(db, jm_homework.id, 'task')
- content = ''
- envs = {}
- if jm_homework.type == "Dag":
- content = red_dag_and_format(jm_homework, relation_list, db)
- requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
- if requirements_relation:
- envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{jm_homework.dag_uuid.lower()}.zip'})
- elif jm_homework.type == "Python":
- content = red_python_and_format(jm_homework)
- af_task = {
- "name": jm_homework.name,
- "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
- "script": content if jm_homework.type != "Java" else "",
- "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
- "cmd_parameters": "",
- "envs": envs,
- "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
- "task_type": type_dict[jm_homework.type],
- "user_id": 0,
- }
- res = send_put('/af/af_task', relation.af_id, af_task)
- af_task = res['data']
- return af_task
- def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
- homework_ids = [node['homework_id'] for node in nodes]
- node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes}
- relations = crud.get_af_ids(db,homework_ids, 'task')
- se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
- tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
- dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
- cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
- cron = cron.replace('?','*')
- af_job = {
- "tasks": tasks,
- "name": jm_job_info.name,
- "dependence": dependence,
- "cron": cron,
- "desc": jm_job_info.name,
- "route_strategy": "",
- "block_strategy": "",
- "executor_timeout": 0,
- "executor_fail_retry_count": 0,
- "trigger_status": jm_job_info.status,
- "job_mode":1,
- "job_type": 0,
- "user_id": 0,
- }
- res = send_post('/af/af_job', af_job)
- af_job = res['data']
- send_submit(af_job['id'])
- return af_job
- def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
- homework_ids = [node['homework_id'] for node in nodes]
- node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes}
- relations = crud.get_af_ids(db,homework_ids, 'task')
- se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
- tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
- dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
- cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
- cron = cron.replace('?','*')
- af_job = {
- "tasks": tasks,
- "name": jm_job_info.name,
- "dependence": dependence,
- "cron": cron,
- "desc": jm_job_info.name,
- "route_strategy": "",
- "block_strategy": "",
- "executor_timeout": 0,
- "executor_fail_retry_count": 0,
- "trigger_status": jm_job_info.status,
- }
- job_relation = crud.get_af_id(db,jm_job_info.id,'job')
- res = send_put('/af/af_job', job_relation.af_id, af_job)
- af_job = res['data']
- send_submit(af_job['id'])
- def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Session):
- node_relation_dict = { relation['node_uuid']:relation for relation in relation_list}
- file = minio_client.get_file(jm_homework.dag_url)
- result = json.loads(file)
- edges = result['edges']
- # t_s = {}
- # input_num = {}
- # for edge in edges:
- # if edge['target'] in t_s.keys():
- # t_s[edge['target']].append(edge['source'])
- # else:
- # t_s.update({edge['target']:[edge['source']]})
- nodes = result['nodes']
- sub_nodes = []
- for node in nodes:
- if node['op'] == 'datasource':
- fileds = node['data']['input_source']
- script = 'select '
- for filed in fileds:
- script += filed['dataField'] + ','
- script = script.strip(',')
- ds_id = node_relation_dict[node['id']]['datasource_id']
- database_name = ""
- if ds_id == -1:
- database_name = DATABASE_NAME
- else:
- data_source = crud.get_job_jdbc_datasource(db,ds_id)
- database_name = data_source.database_name
- script += ' from `' + database_name + '`.`'+node_relation_dict[node['id']]['table']+ '`'
- sub_node = {
- "id": node['id'],
- "name": node['name'],
- "op": 'sql',
- "script":script
- }
- sub_nodes.append(sub_node)
- elif node['op'] == 'outputsource':
- fileds = node['data']['output_source']
- ds_id = node_relation_dict[node['id']]['datasource_id']
- database_name = ""
- if ds_id == -1:
- database_name = DATABASE_NAME
- else:
- data_source = crud.get_job_jdbc_datasource(db,ds_id)
- database_name = data_source.database_name
- script = '''def main_func (input0, spark,sc):
- input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']]['table']+'''")'''
- inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {}
- # index = 0
- # input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
- # for input in input_list:
- # if input in input_num.keys():
- # input_num[input]+=1
- # else:
- # input_num.update({input:0})
- # inputs.update({'input'+str(index):[input,input_num[input]]})
- # index+=1
- sub_node = {
- "id": node['id'],
- "name": node['name'],
- "op": 'pyspark',
- "inputs": inputs,
- "script":script
- }
- sub_nodes.append(sub_node)
- else:
- inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {}
- # index = 0
- # input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
- # for input in input_list:
- # if input in input_num.keys():
- # input_num[input]+=1
- # else:
- # input_num.update({input:0})
- # inputs.update({'input'+str(index):[input,input_num[input]]})
- # index+=1
- script = node['data']['script']
- if node['op'] == 'sql':
- script = script.replace('\n', ' ')
- sub_node = {
- "id": node['id'],
- "name": node['name'],
- "op": node['op'],
- "inputs": inputs,
- "script": script,
- }
- sub_nodes.append(sub_node)
- res = {
- 'sub_nodes': sub_nodes,
- 'edges': [(edge['source'],edge['target']) for edge in edges]
- }
- return json.dumps(res)
- def red_python_and_format(jm_homework):
- file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
- return file.decode("utf-8")
- def on_off_control(af_job_id: int,status: int):
- for i in range(0,11):
- parsed_res = get_job_last_parsed_time(af_job_id)
- last_parsed_time = parsed_res['data']['last_parsed_time']
- if last_parsed_time:
- send_pause(af_job_id,status)
- print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
- break
- if i >= 10:
- raise Exception(f"{af_job_id}==>状态修改失败")
- time.sleep(2)
- def execute_job(af_job_id: int):
- res = get_job_last_parsed_time(af_job_id)
- current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
- send_submit(af_job_id)
- for i in range(0,21):
- parsed_res = get_job_last_parsed_time(af_job_id)
- last_parsed_time = parsed_res['data']['last_parsed_time']
- if last_parsed_time and last_parsed_time != current_time:
- res = send_execute(af_job_id)
- print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
- return res
- if i >= 20:
- raise Exception(f"{af_job_id}==>文件正在转化中")
- time.sleep(2)
|