Browse Source

1. 修复日志中文乱码的bug

luoyulong 2 năm trước cách đây
mục cha
commit
96291b28ff
3 tập tin đã thay đổi với 12 bổ sung9 xóa
  1. 5 3
      app/crud/af_run.py
  2. 6 6
      app/routers/run.py
  3. 1 0
      server.py

+ 5 - 3
app/crud/af_run.py

@@ -22,14 +22,16 @@ def get_airflow_runs(db: Session):
     res: List[models.AirflowRun] = db.query(models.AirflowRun).all()
     return res
 
+
 def get_airflow_runs_by_af_job_ids(db: Session, job_ids: List[int]):
-    res: List[models.AirflowRun] = db.query(models.AirflowRun)\
+    res: List[models.AirflowRun] = db.query(models.AirflowRun) \
         .filter(models.AirflowRun.job_id.in_(job_ids)).all()
     return res
 
 
-def get_airflow_run_once_normal_mode(db: Session, af_run_id: str):
-    res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.af_run_id == af_run_id).first()
+def get_airflow_run_once_normal_mode(db: Session, job_id, af_run_id: str):
+    res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.af_run_id == af_run_id,
+                                                                models.AirflowRun.job_id == job_id).first()
     return res
 
 

+ 6 - 6
app/routers/run.py

@@ -119,11 +119,11 @@ def add_notification(item: Item, db: Session = Depends(get_db)):
         time.sleep(1)
         try_count -= 1
 
-    logs = call_airflow_api("get", log_uri, {}).text
+    logs = call_airflow_api("get", log_uri, {}).text.encode('raw_unicode_escape').decode('utf-8')
 
     job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
     if job_item.job_mode == 1:  # normal model, one job-> many runs
-        run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
+        run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'],job_id=job_item.id, db=db)
     elif job_item.job_mode == 2:  # debug model, one job-> one run
         run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
     else:
@@ -185,10 +185,10 @@ def get_airflow_dagrun_running_status(job_id: int, af_run_id: str):
 def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
     state_uri = f"dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances/{task_id}"
     log_uri = f"{state_uri}/logs/1"
-
     job_item = crud.get_airflow_job_once(db=db, item_id=job_id)
+    print(f'job_mode type is {job_item.job_mode}, af_run_id in')
     if job_item.job_mode == 1:  # normal model, one job-> many runs
-        run = crud.get_airflow_run_once_normal_mode(af_run_id=af_run_id, db=db)
+        run = crud.get_airflow_run_once_normal_mode(af_run_id=af_run_id, job_id=job_id,db=db)
     elif job_item.job_mode == 2:  # debug model, one job-> one run
         run = crud.get_airflow_run_once_debug_mode(job_id=job_id, db=db)
     else:
@@ -202,9 +202,9 @@ def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str, db: S
                 return None
             update_run = schemas.AirflowRunUpdate(
                 **{"details": run.details, "status": run.status, "af_run_id": af_run_id})
-            print(f'stat is {state_ret.json()}')
+
             task_info = {
-                "log": log_ret.text,
+                "log": log_ret.text.encode('raw_unicode_escape').decode('utf-8'),
                 "status": state_ret.json()['state'],
                 "execution_time": datetime2timestamp(state_ret.json()['execution_date']),
                 "start_time": datetime2timestamp(state_ret.json()['start_date']),

+ 1 - 0
server.py

@@ -1,3 +1,4 @@
+
 from fastapi import FastAPI
 from fastapi.middleware.cors import CORSMiddleware
 from app.core.airflow.job import AirflowJobSubmitter