Browse Source

bug修复

liweiquan 2 years ago
parent
commit
de8a9dca72
6 changed files with 36 additions and 36 deletions
  1. 3 0
      Dockerfile
  2. 4 4
      app/routers/jm_job_info.py
  3. 20 23
      app/routers/jm_job_log.py
  4. 2 2
      app/services/datax.py
  5. 3 3
      app/services/jm_job.py
  6. 4 4
      app/utils/cron_utils.py

+ 3 - 0
Dockerfile

@@ -35,6 +35,9 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
 RUN locale-gen zh_CN.UTF-8
 RUN dpkg-reconfigure locales
 
+ENV TZ Asia/Shanghai
+RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime &&\
+    echo "Asia/Shanghai" > /etc/timezone
 
 CMD ["supervisord", "-n"]
 

+ 4 - 4
app/routers/jm_job_info.py

@@ -45,9 +45,9 @@ def get_jm_job_infos(db: Session = Depends(get_db)):
     af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
     res = {}
     for af_job_run in af_job_runs:
-        tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
-        if len(tasks) > 0:
-            task = tasks[-1]
+        if af_job_run.status in ['2','3']:
+            tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
+            task = tasks[-1] if len(tasks)>0 else None
             task.pop('log',None)
             job_id = af_to_datax[int(af_job_run.job_id)]
             log = {
@@ -59,7 +59,6 @@ def get_jm_job_infos(db: Session = Depends(get_db)):
                 "trigger_result": 1 if task else 0,
                 "execute_time": task['start_time'] if task else 0,
                 "execute_result": af_job_run.status,
-                "end_time": task['end_time'] if task else 0,
             }
             if job_id in res.keys():
                 res[job_id].append(log)
@@ -147,6 +146,7 @@ def get_cron_next_execute(cron_expression: str):
 
 def run_get_next_time(cron_expression):
     now = datetime.datetime.now()
+    print("====",now)
     cron_str = cron_expression.replace('?','*')
     cron = croniter.croniter(cron_str, now)
     execute_list = []

+ 20 - 23
app/routers/jm_job_log.py

@@ -42,29 +42,26 @@ def get_job_logs(job_id: int = None, params: Params=Depends(get_page), db: Sessi
     res = []
     for af_job_run in af_job_runs:
         tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
-        if len(tasks) > 0:
-            task = tasks[-1]
-            task.pop('log',None)
-            job_id = af_to_datax[int(af_job_run.job_id)]
-            execute_result = None
-            if af_job_run.status <= 1:
-                run_status = get_job_run_status(af_job_run.id)
-                execute_result = run_status['data']['status']
-            log = {
-                "id": af_job_run.id,
-                "job_id": job_id,
-                "job_name": id_to_job[job_id].name,
-                "job_type": id_to_job[job_id].type,
-                "job_tag": id_to_job[job_id].tag,
-                "af_job_id": int(af_job_run.job_id),
-                "run_id": af_job_run.af_run_id,
-                "trigger_time": af_job_run.start_time,
-                "trigger_result": 1,
-                "execute_time": task['start_time'] if task else 0,
-                "execute_result": execute_result if execute_result else af_job_run.status,
-                "end_time": task['end_time'] if task else 0,
-            }
-            res.append(log)
+        task = tasks[-1] if len(tasks) > 0 else None
+        job_id = af_to_datax[int(af_job_run.job_id)]
+        execute_result = None
+        if af_job_run.status <= 1:
+            run_status = get_job_run_status(af_job_run.id)
+            execute_result = run_status['data']['status']
+        log = {
+            "id": af_job_run.id,
+            "job_id": job_id,
+            "job_name": id_to_job[job_id].name,
+            "job_type": id_to_job[job_id].type,
+            "job_tag": id_to_job[job_id].tag,
+            "af_job_id": int(af_job_run.job_id),
+            "run_id": af_job_run.af_run_id,
+            "trigger_time": af_job_run.start_time,
+            "trigger_result": 1,
+            "execute_time": task['start_time'] if task else None,
+            "execute_result": execute_result if execute_result else af_job_run.status,
+        }
+        res.append(log)
     return page_help(res,params['page'],params['size'],total)
 
 @router.get("/logs")

+ 2 - 2
app/services/datax.py

@@ -7,7 +7,7 @@ from sqlalchemy.orm import Session
 def datax_create_job(job_info: models.JobInfo, db: Session):
     af_task = datax_create_task(job_info)
     cron: str = job_info.job_cron
-    cron.replace('?','*')
+    cron = cron.replace('?','*')
     af_job = {
         "tasks": [af_task],
         "name": job_info.job_desc,
@@ -67,7 +67,7 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     old_af_task = old_af_job['tasks'][0]
     af_task = datax_put_task(job_info,old_af_task)
     cron: str = job_info.job_cron
-    cron.replace('?','*')
+    cron = cron.replace('?','*')
     af_job = {
         "tasks": [af_task],
         "name": job_info.job_desc,

+ 3 - 3
app/services/jm_job.py

@@ -69,7 +69,7 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
     dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
-    cron.replace('?','*')
+    cron = cron.replace('?','*')
     af_job = {
         "tasks": tasks,
         "name": jm_job_info.name,
@@ -99,7 +99,7 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
     dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
-    cron.replace('?','*')
+    cron = cron.replace('?','*')
     af_job = {
         "tasks": tasks,
         "name": jm_job_info.name,
@@ -220,7 +220,7 @@ def on_off_control(af_job_id: int,status: int):
             print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
             break
         if i >= 10:
-            raise Exception(f"{af_job_id}==>执行失败")
+            raise Exception(f"{af_job_id}==>状态修改失败")
         time.sleep(2)
 
 def execute_job(af_job_id: int):

+ 4 - 4
app/utils/cron_utils.py

@@ -65,10 +65,10 @@ def check_cron_expression(cron_expression):
     unit_list = ['minute', 'hour', 'day', 'month', 'week']
     reg_list = [
         "^((([0-9]|[0-5][0-9])(\\,|\\-|\\/){1}([0-9]|[0-5][0-9]))|([0-9]|[0-5][0-9])|(\\*))$",
-        "^((([0-9]|[01][0-9]|2[0-3])(\\,|\\-|\\/){1}([0-9]|[01][0-9]|2[0-3]))|([0-9]|[01][0-9]|2[0-3])|(\\*))$",
-        "^((([0-9]|[0-2][0-9]|3[01])(\\,|\\-|\\/){1}([0-9]|[0-2][0-9]|3[01]))|([0-9]|[0-2][0-9]|3[01])|(\\*)|(\\?))$",
-        "^((([0-9]|[0-1][0-2])(\\,|\\-|\\/){1}([0-9]|[0-1][0-2]))|([0-9]|[0-1][0-2])|(\\*))$",
-        "^((([1-9]|0[1-9]|1[0-2])(\\,|\\-|\\/){1}([1-9]|0[1-9]|1[0-2]))|([1-9]|0[1-9]|1[0-2])|(\\*)|(\\?))$"
+        "^((([0-9]|[1][0-9]|2[0-3])(\\,|\\-|\\/){1}([0-9]|[1][0-9]|2[0-3]))|([0-9]|[1][0-9]|2[0-3])|(\\*))$",
+        "^((([1-9]|[1-2][0-9]|3[01])(\\,|\\-|\\/){1}([1-9]|[1-2][0-9]|3[01]))|([1-9]|[1-2][0-9]|3[01])|(\\*)|(\\?))$",
+        "^((([1-9]|[1][0-2])(\\,|\\-|\\/){1}([1-9]|[1][0-2]))|([1-9]|[1][0-2])|(\\*))$",
+        "^((([1-7])(\\,|\\-|\\/){1}([1-7]))|([1-7])|(\\*)|(\\?))$"
         ]
     for cron, unit, reg in zip(cron_list, unit_list, reg_list):
         match_obj = re.match(reg, cron)