liweiquan 2 jaren geleden
bovenliggende
commit
d7126a4a75

+ 1 - 1
app/common/minio.py

@@ -93,6 +93,6 @@ class FileHandler(object):
             return objects
         except ResponseError:
             raise Exception("列出文件失败")
-        return objects
 
 
+datax_client = FileHandler('datax')

+ 5 - 0
app/crud/af_run.py

@@ -22,6 +22,11 @@ def get_airflow_runs(db: Session):
     res: List[models.AirflowRun] = db.query(models.AirflowRun).all()
     return res
 
+def get_airflow_runs_by_af_job_ids(db: Session, job_ids: List[int]):
+    res: List[models.AirflowRun] = db.query(models.AirflowRun)\
+        .filter(models.AirflowRun.job_id.in_(job_ids)).all()
+    return res
+
 
 def get_airflow_run_once(db: Session, run_id: str, job_id: int):
     res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.run_id == run_id,

+ 25 - 0
app/routers/files.py

@@ -1,4 +1,5 @@
 import io
+import time
 import uuid
 from fastapi import APIRouter
 
@@ -42,6 +43,30 @@ def get_file( uri: str):
     response.headers["Content-Disposition"] = "attachment; filename="+uri+".table"
     return response
 
+@router.get("/java/")
+@sxtimeit
+def get_file( uri: str):
+    file_handler = FileHandler("datax")
+    file = file_handler.get_file(uri)
+    code = 200
+    if len(file) == 0:
+        code = 404
+
+    response = StreamingResponse(io.BytesIO(file), status_code=code, media_type="application/octet-stream")
+    # 在请求头进行配置
+    response.headers["Content-Disposition"] = "attachment; filename="+uri
+    return response
+
+@router.post("/java/")
+@web_try()
+@sxtimeit
+def put_java_jar( file: UploadFile = File(...),):
+    print("UploadFile-->",file.filename)
+    file_handler = FileHandler("datax")
+    file_name = str(int(time.time()))+'_'+file.filename
+    url = file_handler.put_byte_file("java/"+file_name, file.file.read())
+    return url
+
 
 @router.post("/jm_job_log/")
 @web_try()

+ 27 - 1
app/routers/jm_job_info.py

@@ -65,8 +65,34 @@ def create_jm_job_node(db: Session, nodes, edges, job_id):
 def get_jm_job_infos(db: Session = Depends(get_db)):
     res_list = []
     jm_job_list = crud.get_jm_job_infos(db)
+    jm_job_ids = [job.id for job in jm_job_list]
+    relations = crud.get_af_ids(db,jm_job_ids, 'job')
+    af_to_datax = {relation.af_id:relation.se_id for relation in relations}
+    af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
+    res = {}
+    for af_job_run in af_job_runs:
+        tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
+        if len(tasks) > 0:
+            task = tasks[-1]
+            task.pop('log',None)
+            job_id = af_to_datax[int(af_job_run.job_id)]
+            log = {
+                "id": af_job_run.id,
+                "job_id": job_id,
+                "af_job_id": int(af_job_run.job_id),
+                "run_id": af_job_run.run_id,
+                "trigger_time": af_job_run.start_time,
+                "trigger_result": 1 if task else 0,
+                "execute_time": task['start_time'] if task else 0,
+                "execute_result": 1 if task and task['status'] == 'success' else 0,
+                "end_time": task['end_time'] if task else 0,
+            }
+            if job_id in res.keys():
+                res[job_id].append(log)
+            else:
+                res.update({job_id: [log]})
     for jm_job in jm_job_list:
-        history = crud.get_one_job_historys(db, jm_job.id)
+        history = res[jm_job.id] if jm_job.id in res.keys() else []
         jm_job_dict = jm_job.to_dict()
         jm_job_dict.update({'history':history[0:10]})
         res_list.append(jm_job_dict)

+ 46 - 38
app/routers/jm_job_log.py

@@ -24,50 +24,58 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def get_job_logs(job_id: int = None, db: Session = Depends(get_db)):
-    jm_job_infos = []
+    jm_job_list = []
     if job_id is not None:
-        jm_job_infos = [crud.get_jm_job_info(db, job_id)]
+        jm_job_list = [crud.get_jm_job_info(db, job_id)]
     else:
-        jm_job_infos = crud.get_jm_job_infos(db)
-    job_id_to_job = {jm_job.id:jm_job for jm_job in jm_job_infos}
-    jm_job_id_list = job_id_to_job.keys()
-    job_history_list = crud.get_historys_by_job_ids(db,jm_job_id_list)
+        jm_job_list = crud.get_jm_job_infos(db)
+    jm_job_list = crud.get_jm_job_infos(db)
+    id_to_job = {job.id:job for job in jm_job_list}
+    relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
+    af_to_datax = {relation.af_id:relation.se_id for relation in relations}
+    af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
     res = []
-    for job_history in job_history_list:
-        jm_job = job_id_to_job[job_history.job_id]
-        job_history_dict = job_history.__dict__
-        job_history_dict.update({"job_name":jm_job.name})
-        job_history_dict.update({"job_type":jm_job.type})
-        job_history_dict.update({"job_tag":jm_job.tag})
-        res.append(job_history_dict)
+    for af_job_run in af_job_runs:
+        tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
+        if len(tasks) > 0:
+            task = tasks[-1]
+            task.pop('log',None)
+            job_id = af_to_datax[int(af_job_run.job_id)]
+            log = {
+                "id": af_job_run.id,
+                "job_id": job_id,
+                "job_name": id_to_job[job_id].name,
+                "job_type": id_to_job[job_id].type,
+                "job_tag": id_to_job[job_id].tag,
+                "af_job_id": int(af_job_run.job_id),
+                "run_id": af_job_run.run_id,
+                "trigger_time": af_job_run.start_time,
+                "trigger_result": 1 if task else 0,
+                "execute_time": task['start_time'] if task else 0,
+                "execute_result": 1 if task and task['status'] == 'success' else 0,
+                "end_time": task['end_time'] if task else 0,
+            }
+            res.append(log)
     return res
 
 @router.get("/logs")
 @web_try()
 @sxtimeit
-def get_job_logs(job_history_id: int,db: Session = Depends(get_db)):
-    job_history_info = crud.get_jm_job_history_info(db,job_history_id)
-    job_info = crud.get_jm_job_info(db,job_history_info.job_id)
-    job_logs = crud.get_jm_job_logs_by_history_id(db,job_history_id)
-    if len(job_logs) <= 0:
-        raise Exception("未找到该任务此次运行的日志")
-    if job_info.type == '单作业离线任务':
-        return {
-            'job_type': job_info.type,
-            'logs': job_logs,
+def get_job_logs(run_id: str, job_id: int, db: Session = Depends(get_db)):
+    af_job_run = crud.get_airflow_run_once(db, run_id, job_id)
+    tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
+    res = []
+    for task in tasks:
+        log = {
+            "id": af_job_run.id,
+            "af_job_id": int(af_job_run.job_id),
+            "run_id": af_job_run.run_id,
+            "trigger_time": af_job_run.start_time,
+            "trigger_result": 1 if task else 0,
+            "execute_time": task['start_time'] if task else 0,
+            "execute_result": 1 if task and task['status'] == 'success' else 0,
+            "end_time": task['end_time'] if task else 0,
+            "log": task['log'] if task else None
         }
-    res = {}
-    for job_log in job_logs:
-        if job_log.homework_id in res.keys():
-            res[job_log.homework_id]['nodes'].append(job_log)
-        else:
-            res.update({job_log.homework_id:{
-                "homework_name":job_log.homework_name,
-                "nodes": [job_log]
-            }})
-
-    logs = [v for k, v in res.items()]
-    return {
-        'job_type': job_info.type,
-        'logs': logs,
-    }
+        res.append(log)
+    return res

+ 38 - 4
app/routers/job_log.py

@@ -29,11 +29,45 @@ def create_job_log(item: schemas.JobLogCreate, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def get_job_logs(params: Params = Depends(), db: Session = Depends(get_db)):
-    return paginate(crud.get_job_logs(db), params)
+    job_infos = crud.get_job_infos(db)
+    job_ids = [job.id for job in job_infos]
+    relations = crud.get_af_ids(db, job_ids, 'datax')
+    af_to_datax = {relation.af_id:relation.se_id for relation in relations}
+    af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
+    res = []
+    for af_job_run in af_job_runs:
+        task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
+        log = {
+            "id": af_job_run.id,
+            "job_id": af_to_datax[int(af_job_run.job_id)],
+            "af_job_id": int(af_job_run.job_id),
+            "run_id": af_job_run.run_id,
+            "trigger_time": af_job_run.start_time,
+            "trigger_result": 1 if task else 0,
+            "execute_time": task['start_time'] if task else 0,
+            "execute_result": 1 if task and task['status'] == 'success' else 0,
+            "end_time": task['end_time'] if task else 0,
+            "log": task['log'] if task else None
+        }
+        res.append(log)
+    return paginate(res, params)
 
 
-@router.get("/getOnce/{id}")
+@router.get("/getOnce")
 @web_try()
 @sxtimeit
-def get_job_logs_once(id: int, db: Session = Depends(get_db)):
-    return crud.get_job_log_once(db, id)
+def get_job_logs_once(run_id: str, job_id: int, db: Session = Depends(get_db)):
+    af_job_run = crud.get_airflow_run_once(db, run_id, job_id)
+    task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
+    log = {
+        "id": af_job_run.id,
+        "af_job_id": int(af_job_run.job_id),
+        "run_id": af_job_run.run_id,
+        "trigger_time": af_job_run.start_time,
+        "trigger_result": 1 if task else 0,
+        "execute_time": task['start_time'] if task else 0,
+        "execute_result": 1 if task and task['status'] == 'success' else 0,
+        "end_time": task['end_time'] if task else 0,
+        "log": task['log'] if task else None
+    }
+    return log

+ 1 - 1
app/schemas/jm_homework.py

@@ -20,7 +20,7 @@ class JmHomeworkBase(BaseModel):
     # 脚本文件
     script_file: str
     # 执行命令
-    execute_command: str
+    execute_command: Optional[str]
     # 用户ID
     user_id: str
     # 项目ID

+ 1 - 0
app/services/datax.py

@@ -16,6 +16,7 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
         "executor_timeout": job_info.executor_timeout,
         "executor_fail_retry_count": job_info.executor_fail_retry_count,
         "trigger_status": job_info.trigger_status,
+        "job_mode":1,
         "job_type": 0,
         "user_id": 0,
     }

+ 7 - 9
app/services/jm_job.py

@@ -21,7 +21,7 @@ def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
         content = red_python_and_format(jm_homework)
     af_task = {
         "name": jm_homework.name,
-        "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
+        "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
         "script": content if jm_homework.type != "Java" else "",
         "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
         "cmd_parameters": "",
@@ -44,7 +44,7 @@ def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
         content = red_python_and_format(jm_homework)
     af_task = {
         "name": jm_homework.name,
-        "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
+        "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
         "script": content if jm_homework.type != "Java" else "",
         "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
         "cmd_parameters": "",
@@ -76,6 +76,7 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
         "executor_timeout": 0,
         "executor_fail_retry_count": 0,
         "trigger_status": jm_job_info.status,
+        "job_mode":1,
         "job_type": 0,
         "user_id": 0,
     }
@@ -142,7 +143,7 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             script = 'select '
             for filed in fileds:
                 script += filed['dataField'] + ','
-            script.strip(',')
+            script = script.strip(',')
             script += ' from ' + node_relation_dict[node['id']].table
             sub_node = {
                 "id": node['id'],
@@ -156,7 +157,7 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             script = 'select '
             for filed in fileds:
                 script += filed['dataField'] + ','
-            script.strip([','])
+            script = script.strip(',')
             script += ' from ' + node_relation_dict[node['id']].table
             inputs = {}
             index = 0
@@ -196,12 +197,9 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             }
             sub_nodes.append(sub_node)
     res = {
-        'dag_script': {
-            'sub_nodes': sub_nodes,
-            'edges': [(edge['source'],edge['target']) for edge in edges]
-        }
+        'sub_nodes': sub_nodes,
+        'edges': [(edge['source'],edge['target']) for edge in edges]
     }
-
     return json.dumps(res)
 
 def red_python_and_format(jm_homework):

File diff suppressed because it is too large
+ 0 - 178
dag/demo.dag


+ 4 - 1
production.ini

@@ -13,4 +13,7 @@ port = 10086
 [MINIO]
 url = minio-api.sxkj.com
 access_key = admin
-secret_key = sxkjadmin
+secret_key = sxkjadmin
+[AIRFLOW]
+host = 192.168.199.109
+port = 18082

Some files were not shown because too many files changed in this diff