liweiquan vor 2 Jahren
Ursprung
Commit
633a712a78

+ 3 - 1
app/routers/data_management.py

@@ -10,6 +10,7 @@ from app import schemas
 
 import app.crud as crud
 from app.services.dag import get_tmp_table_name
+from app.utils.send_util import data_transfer_run
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from app.common.hive import hiveDs
@@ -30,7 +31,8 @@ def create_data_management(item: schemas.DataManagementCreate, db: Session = Dep
     table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
     tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
     # 执行临时表的转存,目前还不能,先将临时表名存入
-    res = crud.create_data_management(db, item, tmp_table_name)
+    af_run_id = data_transfer_run(tmp_table_name, table_name)
+    res = crud.create_data_management(db, item, table_name)
     return res
 
 

+ 1 - 1
app/routers/jm_job_info.py

@@ -138,7 +138,7 @@ def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_
 @sxtimeit
 def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
     relation = crud.get_af_id(db, jm_job_id, 'job')
-    send_delete('/jpt/af_job', relation.af_id)
+    send_delete('/af/af_job', relation.af_id)
     return crud.delete_jm_job_info(db,jm_job_id)
 
 @router.put("/status")

+ 1 - 1
app/routers/job_info.py

@@ -96,7 +96,7 @@ def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depe
 @sxtimeit
 def delete_job_info(job_id: int, db: Session = Depends(get_db)):
     relation = crud.get_af_id(db, job_id, 'datax')
-    send_delete('/jpt/af_job', relation.af_id)
+    send_delete('/af/af_job', relation.af_id)
     return crud.delete_job_info(db, job_id)
 
 @router.post("/execute")

+ 5 - 5
app/services/dag.py

@@ -23,7 +23,7 @@ def dag_create_job(dag_uuid:str,dag_script: str,db: Session):
         "job_type": 0,
         "user_id": 0,
     }
-    res = send_post('/jpt/af_job', af_job)
+    res = send_post('/af/af_job', af_job)
     af_job = res['data']
     crud.create_debug_relation(db,dag_uuid,'debug',af_job['id'])
     return af_job
@@ -40,14 +40,14 @@ def dag_create_task(dag_script: str):
         "task_type": "sparks",
         "user_id": 0,
     }
-    res = send_post('/jpt/af_task', af_task)
+    res = send_post('/af/af_task', af_task)
     af_task = res['data']
     return af_task
 
 def dag_update_job(dag_uuid:str,dag_script: str, db: Session):
     relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
     af_job_id = relation.af_id
-    res = send_get("/jpt/af_job/getOnce",af_job_id)
+    res = send_get("/af/af_job/getOnce",af_job_id)
     old_af_job = res['data']
     old_af_task = old_af_job['tasks'][0]
     af_task = dag_put_task(dag_script,old_af_task)
@@ -63,7 +63,7 @@ def dag_update_job(dag_uuid:str,dag_script: str, db: Session):
         "executor_fail_retry_count": 0,
         "trigger_status": 1,
     }
-    res = send_put('/jpt/af_job', old_af_job['id'], af_job)
+    res = send_put('/af/af_job', old_af_job['id'], af_job)
     af_job = res['data']
     return af_job
 
@@ -79,7 +79,7 @@ def dag_put_task(dag_script: str,old_af_task):
         "run_image": "",
         "task_type": "sparks",
     }
-    res = send_put('/jpt/af_task', old_af_task['id'],af_task)
+    res = send_put('/af/af_task', old_af_task['id'],af_task)
     af_task = res['data']
     return af_task
 

+ 4 - 4
app/services/datax.py

@@ -23,7 +23,7 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
         "job_type": 0,
         "user_id": 0,
     }
-    res = send_post('/jpt/af_job', af_job)
+    res = send_post('/af/af_job', af_job)
     af_job = res['data']
     crud.create_relation(db, job_info.id,'datax', af_job['id'])
     send_submit(af_job['id'])
@@ -65,7 +65,7 @@ def datax_create_task(job_info: models.JobInfo):
 def datax_update_job(job_info: models.JobInfo, db: Session):
     relation = crud.get_af_id(db, job_info.id, 'datax')
     af_job_id = relation.af_id
-    res = send_get("/jpt/af_job/getOnce",af_job_id)
+    res = send_get("/af/af_job/getOnce",af_job_id)
     old_af_job = res['data']
     old_af_task = old_af_job['tasks'][0]
     af_task = datax_put_task(job_info,old_af_task)
@@ -83,7 +83,7 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
         "executor_fail_retry_count": job_info.executor_fail_retry_count,
         "trigger_status": job_info.trigger_status,
     }
-    res = send_put('/jpt/af_job', old_af_job['id'], af_job)
+    res = send_put('/af/af_job', old_af_job['id'], af_job)
     af_job = res['data']
     send_submit(af_job['id'])
     on_off_control(af_job['id'], job_info.trigger_status)
@@ -114,7 +114,7 @@ def datax_put_task(job_info: models.JobInfo,old_af_task):
         "envs": envs,
         "run_image": "",
     }
-    res = send_put('/jpt/af_task', old_af_task['id'],af_task)
+    res = send_put('/af/af_task', old_af_task['id'],af_task)
     af_task = res['data']
     return af_task
 

+ 8 - 8
app/services/jm_job.py

@@ -32,7 +32,7 @@ def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
         "task_type": type_dict[jm_homework.type],
         "user_id": 0,
     }
-    res = send_post('/jpt/af_task', af_task)
+    res = send_post('/af/af_task', af_task)
     af_task = res['data']
     crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
     return af_task
@@ -55,7 +55,7 @@ def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
         "task_type": type_dict[jm_homework.type],
         "user_id": 0,
     }
-    res = send_put('/jpt/af_task', relation.af_id, af_task)
+    res = send_put('/af/af_task', relation.af_id, af_task)
     af_task = res['data']
     return af_task
 
@@ -71,7 +71,7 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
     homework_ids = [node.homework_id for node in nodes]
     relations = crud.get_af_ids(db,homework_ids, 'task')
     se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
-    tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
+    tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
     edges = crud.get_one_job_edges(db, jm_job_info.id)
     dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
@@ -91,7 +91,7 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
         "job_type": 0,
         "user_id": 0,
     }
-    res = send_post('/jpt/af_job', af_job)
+    res = send_post('/af/af_job', af_job)
     af_job = res['data']
     crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
     send_submit(af_job['id'])
@@ -104,7 +104,7 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
     node_id_to_h_id = {node.id:node.homework_id for node in nodes}
     relations = crud.get_af_ids(db,homework_ids, 'task')
     se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
-    tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
+    tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
     edges = crud.get_one_job_edges(db, jm_job_info.id)
     dependence = [[se_id_to_af_id_dict[node_id_to_h_id[edge.in_node_id]],se_id_to_af_id_dict[node_id_to_h_id[edge.out_node_id]]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
@@ -122,7 +122,7 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
         "trigger_status": jm_job_info.status,
     }
     job_relation = crud.get_af_id(db,jm_job_info.id,'job')
-    res = send_put('/jpt/af_job', job_relation.af_id, af_job)
+    res = send_put('/af/af_job', job_relation.af_id, af_job)
     af_job = res['data']
     send_submit(af_job['id'])
     on_off_control(af_job['id'],jm_job_info.status)
@@ -172,7 +172,7 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
     input0.write.mode("overwrite").saveAsTable("'''+node_relation_dict[node['id']].table+'''")'''
             inputs = {}
             index = 0
-            input_list = t_s[node['id']]
+            input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
             for input in input_list:
                 if input in input_num.keys():
                     input_num[input]+=1
@@ -191,7 +191,7 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
         else:
             inputs = {}
             index = 0
-            input_list = t_s[node['id']]
+            input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
             for input in input_list:
                 if input in input_num.keys():
                     input_num[input]+=1

+ 16 - 6
app/utils/send_util.py

@@ -15,7 +15,7 @@ def send_post(uri,data):
         raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
 
 def send_submit(af_job_id):
-    res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/submit?id='+str(af_job_id))
+    res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/submit?id='+str(af_job_id))
     result = res.json()
     print(result)
     if 'code' in result.keys() and result['code'] == 200:
@@ -43,7 +43,7 @@ def send_get(uri,path_data):
 
 # 执行任务
 def send_execute(path_data):
-    res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(path_data)}/run')
+    res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/{str(path_data)}/run')
     result = res.json()
     print(result)
     if 'code' in result.keys() and result['code'] == 200:
@@ -54,7 +54,7 @@ def send_execute(path_data):
 # 起停任务
 def send_pause(af_job_id, status):
     flag = True if status == 0 else False
-    res = requests.patch(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(af_job_id)}/pause/{str(flag)}')
+    res = requests.patch(url=f'http://{HOST}:{PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}')
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
@@ -73,7 +73,7 @@ def send_delete(uri, path_data):
 
 # 获取airflow端dag文件生成时间
 def get_job_last_parsed_time(path_data):
-    res = requests.get(url=f'http://{HOST}:{PORT}/jpt/af_job/{path_data}/last_parsed_time')
+    res = requests.get(url=f'http://{HOST}:{PORT}/af/af_job/{path_data}/last_parsed_time')
     result = res.json()
     print(result)
     if 'code' in result.keys() and result['code'] == 200:
@@ -83,10 +83,20 @@ def get_job_last_parsed_time(path_data):
 
 # 获取job某次运行的状态
 def get_job_run_status(path_data):
-    res = requests.get(url=f'http://{HOST}:{PORT}/jpt/af_run/{path_data}/status')
+    res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/{path_data}/status')
     result = res.json()
     print(result)
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('获取job某次运行的状态-->请求airflow失败-->'+result['msg'])
+        raise Exception('获取job某次运行的状态-->请求airflow失败-->'+result['msg'])
+
+# 中间结果转存
+def data_transfer_run(source_tb: str, target_tb: str):
+    res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}')
+    result = res.json()
+    print(result)
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        raise Exception('中间结果转存,请求airflow失败-->'+result['msg'])