Browse Source

1. 更新日志方式查询方式

luoyulong 2 years ago
parent
commit
2d62ad53ff
4 changed files with 36 additions and 23 deletions
  1. 2 1
      app/models/database.py
  2. 31 22
      app/routers/run.py
  3. 2 0
      development.ini
  4. 1 0
      production.ini

+ 2 - 1
app/models/database.py

@@ -24,11 +24,12 @@ engine = create_engine(
 
 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
 
-logger.info(f" connect to mysql success: {SQLALCHEMY_DATABASE_URL}")
+# logger.info(f" connect to mysql success: {SQLALCHEMY_DATABASE_URL}")
 
 Base = declarative_base()
 
 
+
 class BaseModel(Base):
     __abstract__ = True
 

+ 31 - 22
app/routers/run.py

@@ -183,30 +183,39 @@ def get_airflow_dagrun_running_status(job_id: int, af_run_id: str, db: Session =
         }
         # print(f"{task['task_id']}:{task['duration']}")
 
-    # item = schemas.AirflowRunUpdate(**{#"start_time": item.data["start_time"],
-    #                                    #"job_id": int(job_id),
-    #                                    # "run_ts": item.data['run_ts'],
-    #                                    # "af_run_id": item.data['af_run_id'],
-    #                                     "end_time":datetime2timestamp()
-    #                                    "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
-    #                                                                            "sparks": sparks_dependence}},
-    #                                    "status": 0},
-
-    # item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"],
-    #                                    "job_id": int(job_id),
-    #                                    # "run_ts": item.data['run_ts'],
-    #                                    # "af_run_id": item.data['af_run_id'],
-    #                                    "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
-    #                                                                            "sparks": sparks_dependence}},
-    #                                    "status": 0},
-
-    # return ret.json()
+
 
 
 @router_af_run.get("/task_log/{job_id}/{af_run_id}/{task_id}")
 @web_try()
 @sxtimeit
-def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str):
-    ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances/{task_id}/logs/1',
-                           args_dict={})
-    return {"log": ret.text}
+def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
+    state_uri = f"dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances/{task_id}"
+    log_uri = f"{state_uri}/logs/1"
+
+    job_item = crud.get_airflow_job_once(db=db, item_id=job_id)
+    if job_item.job_mode == 1:  # normal model, one job-> many runs
+        run = crud.get_airflow_run_once_normal_mode(af_run_id=af_run_id, db=db)
+    elif job_item.job_mode == 2:  # debug model, one job-> one run
+        run = crud.get_airflow_run_once_debug_mode(job_id=job_id, db=db)
+    else:
+        run = None
+
+    if run is not None:
+        if run.details['tasks'].get(task_id, {}).get("status", "running") not in ["success", "failed"]:
+            state_ret = call_airflow_api(method='get', uri=state_uri, args_dict={})
+            log_ret = call_airflow_api(method='get', uri=log_uri, args_dict={})
+            if state_ret.status_code != 200 or log_ret.status_code != 200:
+                raise Exception('cant found the information of this task,please check your input ')
+            update_run = schemas.AirflowRunUpdate(
+                **{"details": run.details, "status": run.status, "af_run_id": af_run_id})
+            task_info = {
+                "log": log_ret.text, "status": state_ret.json()['state'],
+                "start_time": datetime2timestamp(state_ret.json()['start_date']),
+                "end_time": datetime2timestamp(state_ret.json()['end_date']),
+            }
+            update_run.details['tasks'][task_id] = task_info
+            crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
+            return task_info
+        else:
+            return run.details['tasks'][task_id]

+ 2 - 0
development.ini

@@ -19,6 +19,8 @@ host=192.168.199.109
 port=18082
 dag_files_dir=/dags/
 
+
+
 [BACKEND]
 url=192.168.199.107:18082
 

+ 1 - 0
production.ini

@@ -13,6 +13,7 @@ access_key = minioadmin
 secret_key = minioadmin
 
 
+
 [AF_BACKEND]
 uri=aihub-backend-af-yili-test:8080
 host=aihub-backend-af-yili-test