Parcourir la source

同步配置日志获取、数据源添加增加报错信息

liweiquan il y a 2 ans
Parent
commit
65d5038752

+ 10 - 5
app/crud/af_run.py

@@ -23,11 +23,16 @@ def get_airflow_runs(db: Session):
     return res
 
 
-def get_airflow_runs_by_af_job_ids(db: Session, job_ids: List[int], start: int, end: int):
-    res: List[models.AirflowRun] = db.query(models.AirflowRun) \
-        .filter(models.AirflowRun.job_id.in_(job_ids))\
-        .order_by(models.AirflowRun.start_time.desc())\
-        .slice(start,end).all()
+def get_airflow_runs_by_af_job_ids(db: Session, job_ids: List[int], start: int = None, end: int = None):
+    res: List[models.AirflowRun] = []
+    if start is None or end is None:
+        res = db.query(models.AirflowRun) \
+        .filter(models.AirflowRun.job_id.in_(job_ids)).all()
+    else:
+        res = db.query(models.AirflowRun) \
+            .filter(models.AirflowRun.job_id.in_(job_ids))\
+            .order_by(models.AirflowRun.start_time.desc())\
+            .slice(start,end).all()
     return res
 
 

+ 5 - 1
app/crud/job_jdbc_datasource.py

@@ -20,7 +20,11 @@ def _format_datasource(db: Session, item: schemas.JobJdbcDatasourceBase, ds_id:
             raise Exception('未找到该数据源')
         item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
         item.jdbc_username, item.jdbc_password = decode_user(item.jdbc_username, item.jdbc_password)
-    host, port = item.jdbc_url.split(':')
+    try:
+        host, port = item.jdbc_url.split(':')
+    except:
+        raise Exception('数据库地址填写错误')
+
     if not host or not port:
         raise Exception('jdbc_url无效')
     ds = None

+ 2 - 6
app/routers/jm_job_log.py

@@ -38,12 +38,8 @@ def get_job_logs(job_id: int = None, params: Params=Depends(get_page), db: Sessi
     relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
     af_to_datax = {relation.af_id:relation.se_id for relation in relations}
     # 获取任务运行记录
-    af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
-    # 根据时间进行排序
-    af_job_runs.sort(key=lambda x: x.start_time, reverse=True)
-    total = len(af_job_runs)
-    # 进行分页
-    af_job_runs = af_job_runs[(params['page'] - 1) * params['size']:params['page'] * params['size']]
+    af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys(),(params['page'] - 1) * params['size'],params['page'] * params['size'])
+    total = crud.count_airflow_runs_by_job_ids(db, af_to_datax.keys())
     res = []
     for af_job_run in af_job_runs:
         job_id = af_to_datax[int(af_job_run.job_id)]

+ 0 - 12
app/routers/job_log.py

@@ -33,18 +33,10 @@ def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page),
     relations = crud.get_af_ids(db, id_to_job.keys(), 'datax')
     af_to_datax = {relation.af_id:relation.se_id for relation in relations}
     # 获取运行记录
-    print(f'获取运行记录{af_to_datax.keys()}')
     af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys(),(params['page'] - 1) * params['size'],params['page'] * params['size'])
     total = crud.count_airflow_runs_by_job_ids(db, af_to_datax.keys())
-    # # 根据开始时间排序
-    # print('根据时间排序')
-    # af_job_runs.sort(key=lambda x: x.start_time, reverse=True)
-    # total = len(af_job_runs)
-    # # 进行分页
-    # af_job_runs = af_job_runs[(params['page'] - 1) * params['size']:params['page'] * params['size']]
     res = []
     # 循环获取日志
-    print('循环获取日志')
     for af_job_run in af_job_runs:
         job_id = af_to_datax[int(af_job_run.job_id)]
         print(f'job_id==>{job_id}')
@@ -52,16 +44,12 @@ def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page),
         job_log = None
         if len(af_job_run.details['tasks']) > 0:
             job_log = list(af_job_run.details['tasks'].values())[0]
-            print('已存在运行记录')
         else:
-            print('不存在运行记录')
             af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
             task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
             print(f"datax任务的作业{task['id']}")
             log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
-            print(f'log_res==>{log_res}')
             job_log = log_res['data'] if 'data' in log_res.keys() else None
-        print(f'job_log==>{job_log}')
         log = {
             "id": af_job_run.id,
             "job_id": job_id,

+ 1 - 1
constants/constants.py

@@ -8,4 +8,4 @@ CONSTANTS = {
     'DATASOURCES': DATASOURCES
 }
 
-RUN_STATUS = {"queued": 0, 'scheduled': 1, 'running': 1, 'success': 2, 'failed': 3, 'upstream_failed': 3}
+RUN_STATUS = {"queued": 0, 'scheduled': 1, 'running': 1, 'success': 2, 'failed': 3, 'skipped': 3,'upstream_failed': 3}