liweiquan пре 2 година
родитељ
комит
9b58340124

+ 3 - 3
app/crud/jm_homework.py

@@ -5,7 +5,7 @@ from sqlalchemy.orm import Session
 from app.crud.constant import find_and_update
 
 from app.crud.jm_homework_datasource_relation import create_jm_hd_relation, delete_jm_relations, get_jm_relations
-from app.services.jm_job import jm_job_create_task, jm_job_update_task
+from app.services.jm_job import jm_homework_submit
 
 
 def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
@@ -28,10 +28,10 @@ def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
     db.add(db_item)
     db.commit()
     db.refresh(db_item)
-    jm_job_create_task(db_item, db)
     if jm_homework_create['type'] == 'Dag' and relation_list is not None:
         for relation in relation_list:
             create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
+    jm_homework_submit(db_item, db)
     return db_item.to_dict()
 
 def get_jm_homeworks(db: Session, project_id: str):
@@ -68,7 +68,6 @@ def update_jm_homework(db: Session, id: int, update_item: schemas.JmHomeworkUpda
     for k, v in jm_homework_update.items():
         setattr(db_item, k, v)
     db_item.update_time = int(time.time())
-    jm_job_update_task(db_item, db)
     db.commit()
     db.flush()
     db.refresh(db_item)
@@ -76,6 +75,7 @@ def update_jm_homework(db: Session, id: int, update_item: schemas.JmHomeworkUpda
     if jm_homework_update['type'] == 'Dag' and relation_list is not None:
         for relation in relation_list:
             create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
+    jm_homework_submit(db_item, db)
     return db_item.to_dict()
 
 def delete_jm_homework(db: Session, id: int):

+ 11 - 0
app/routers/files.py

@@ -68,6 +68,17 @@ def put_java_jar( file: UploadFile = File(...),):
     return url
 
 
+@router.post("/python/")
+@web_try()
+@sxtimeit
+def put_java_jar( file: UploadFile = File(...),):
+    print("UploadFile-->",file.filename)
+    file_handler = FileHandler("datax")
+    file_name = str(int(time.time()))+'_'+file.filename
+    url = file_handler.put_byte_file("python/"+file_name, file.file.read())
+    return url
+
+
 @router.post("/jm_job_log/")
 @web_try()
 @sxtimeit

+ 4 - 2
app/routers/jm_job_info.py

@@ -10,6 +10,7 @@ from app import models, schemas
 import app.crud as crud
 from app.schemas import cron_expression
 from app.utils.cron_utils import *
+from app.utils.send_util import send_execute
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
@@ -150,8 +151,9 @@ def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
     jm_job = crud.get_jm_job_info(db,jm_job_id)
     if jm_job.status == 0:
         raise Exception('任务已被停用')
-    # 进行api调用
-    return jm_job
+    relation = crud.get_af_id(db, jm_job_id, 'job')
+    res = send_execute(relation.af_id)
+    return res['data']
 
 
 @router.post("/cron_expression")

+ 2 - 1
app/routers/jm_job_log.py

@@ -29,7 +29,6 @@ def get_job_logs(job_id: int = None, db: Session = Depends(get_db)):
         jm_job_list = [crud.get_jm_job_info(db, job_id)]
     else:
         jm_job_list = crud.get_jm_job_infos(db)
-    jm_job_list = crud.get_jm_job_infos(db)
     id_to_job = {job.id:job for job in jm_job_list}
     relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
     af_to_datax = {relation.af_id:relation.se_id for relation in relations}
@@ -56,6 +55,7 @@ def get_job_logs(job_id: int = None, db: Session = Depends(get_db)):
                 "end_time": task['end_time'] if task else 0,
             }
             res.append(log)
+    res.sort(key=lambda x: x['trigger_time'], reverse=True)
     return res
 
 @router.get("/logs")
@@ -78,4 +78,5 @@ def get_job_logs(run_id: str, job_id: int, db: Session = Depends(get_db)):
             "log": task['log'] if task else None
         }
         res.append(log)
+    res.sort(key=lambda x: x['trigger_time'], reverse=True)
     return res

+ 11 - 0
app/routers/job_info.py

@@ -102,4 +102,15 @@ def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depe
 def delete_job_info(job_id: int, db: Session = Depends(get_db)):
     return crud.delete_job_info(db, job_id)
 
+@router.post("/execute")
+@web_try()
+@sxtimeit
+def execute_job_info(job_id: int, db: Session = Depends(get_db)):
+    jm_job = crud.get_job_info(db, job_id)
+    if jm_job.trigger_status == 0:
+        raise Exception('任务已被停用')
+    relation = crud.get_af_id(db, job_id, 'datax')
+    res = send_execute(relation.af_id)
+    return res['data']
+
 add_pagination(router)

+ 6 - 3
app/routers/job_log.py

@@ -30,16 +30,18 @@ def create_job_log(item: schemas.JobLogCreate, db: Session = Depends(get_db)):
 @sxtimeit
 def get_job_logs(params: Params = Depends(), db: Session = Depends(get_db)):
     job_infos = crud.get_job_infos(db)
-    job_ids = [job.id for job in job_infos]
-    relations = crud.get_af_ids(db, job_ids, 'datax')
+    id_to_job = {job.id:job for job in job_infos}
+    relations = crud.get_af_ids(db, id_to_job.keys(), 'datax')
     af_to_datax = {relation.af_id:relation.se_id for relation in relations}
     af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
     res = []
     for af_job_run in af_job_runs:
         task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
+        job_id = af_to_datax[int(af_job_run.job_id)]
         log = {
             "id": af_job_run.id,
-            "job_id": af_to_datax[int(af_job_run.job_id)],
+            "job_id": job_id,
+            "job_desc": id_to_job[job_id].job_desc,
             "af_job_id": int(af_job_run.job_id),
             "run_id": af_job_run.run_id,
             "trigger_time": af_job_run.start_time,
@@ -50,6 +52,7 @@ def get_job_logs(params: Params = Depends(), db: Session = Depends(get_db)):
             "log": task['log'] if task else None
         }
         res.append(log)
+    res.sort(key=lambda x: x['trigger_time'], reverse=True)
     return paginate(res, params)
 
 

+ 10 - 4
app/services/jm_job.py

@@ -57,6 +57,13 @@ def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
     af_task = res['data']
     return af_task
 
+def jm_homework_submit(jm_homework: models.JmHomework, db: Session):
+    task_relation = crud.get_af_id(db,jm_homework.id,'task')
+    if task_relation is None:
+        jm_job_create_task(jm_homework, db)
+    else:
+        jm_job_update_task(jm_homework, db)
+
 def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
     nodes = crud.get_one_job_nodes(db, jm_job_info.id)
     homework_ids = [node.homework_id for node in nodes]
@@ -69,7 +76,7 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
         "tasks": tasks,
         "name": jm_job_info.name,
         "dependence": dependence,
-        "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else 'onec',
+        "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@onec',
         "desc": jm_job_info.name,
         "route_strategy": "",
         "block_strategy": "",
@@ -98,7 +105,7 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
         "tasks": tasks,
         "name": jm_job_info.name,
         "dependence": dependence,
-        "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else 'onec',
+        "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@onec',
         "desc": jm_job_info.name,
         "route_strategy": "",
         "block_strategy": "",
@@ -122,7 +129,6 @@ def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
 def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
     relations = get_jm_relations(db,jm_homework.id)
     node_relation_dict = { relation.node_uuid:relation for relation in relations}
-
     f = open('./dag' + jm_homework.dag_url)
     lines = f.read()
     result = json.loads(lines)
@@ -204,5 +210,5 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
 
 def red_python_and_format(jm_homework):
     file_handler = FileHandler("datax")
-    file = file_handler.get_file("/python/test.py")
+    file = file_handler.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
     return file.decode("utf-8")

+ 13 - 4
app/utils/send_util.py

@@ -12,7 +12,7 @@ def send_post(uri,data):
         return res.json()
     else:
         print(result)
-        raise Exception('请求airflow失败-->'+result['msg'])
+        raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
 
 def send_submit(af_job_id):
     res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/submit?id='+str(af_job_id))
@@ -21,7 +21,7 @@ def send_submit(af_job_id):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败-->'+result['msg'])
+        raise Exception('提交任务,请求airflow失败-->'+result['msg'])
 
 
 def send_put(uri,path_data,data):
@@ -30,7 +30,7 @@ def send_put(uri,path_data,data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败-->'+result['msg'])
+        raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
 
 def send_get(uri,path_data):
     res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
@@ -38,4 +38,13 @@ def send_get(uri,path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('请求airflow失败-->'+result['msg'])
+        raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
+
+
+def send_execute(path_data):
+    res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(path_data)}/run')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        raise Exception('执行一次任务,请求airflow失败-->'+result['msg'])