浏览代码

判断转化是否完成逻辑修改

liweiquan 2 年之前
父节点
当前提交
9cf55e9532
共有 3 个文件被更改,包括 9 次插入6 次删除
  1. 3 2
      app/services/dag.py
  2. 3 2
      app/services/datax.py
  3. 3 2
      app/services/jm_job.py

+ 3 - 2
app/services/dag.py

@@ -93,13 +93,14 @@ def dag_job_submit(dag_uuid:str,dag_script: str,db: Session):
         af_job = dag_create_job(dag_uuid, dag_script, db)
     else:
         af_job = dag_update_job(dag_uuid, dag_script, db)
-    current_time = int(time.time())
+    get_job_last_parsed_time(af_job['id'])
+    current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
     send_submit(af_job['id'])
     for i in range(0,21):
         time.sleep(2)
         res = get_job_last_parsed_time(af_job['id'])
         last_parsed_time = res['data']['last_parsed_time']
-        if last_parsed_time and current_time < int(last_parsed_time):
+        if last_parsed_time and current_time != last_parsed_time:
             send_pause(af_job['id'],1)
             send_execute(af_job['id'])
             print(f"{af_job['id']}<==执行成功==>{last_parsed_time}")

+ 3 - 2
app/services/datax.py

@@ -127,12 +127,13 @@ def on_off_control(af_job_id: int,status: int):
         time.sleep(2)
 
 def execute_job(af_job_id: int):
-    current_time = int(time.time())
+    get_job_last_parsed_time(af_job_id)
+    current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
     send_submit(af_job_id)
     for i in range(0,21):
         parsed_res = get_job_last_parsed_time(af_job_id)
         last_parsed_time = parsed_res['data']['last_parsed_time']
-        if last_parsed_time and int(last_parsed_time) > current_time:
+        if last_parsed_time and last_parsed_time != current_time:
             res = send_execute(af_job_id)
             print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
             return res

+ 3 - 2
app/services/jm_job.py

@@ -222,12 +222,13 @@ def on_off_control(af_job_id: int,status: int):
         time.sleep(2)
 
 def execute_job(af_job_id: int):
-    current_time = int(time.time())
+    get_job_last_parsed_time(af_job_id)
+    current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
     send_submit(af_job_id)
     for i in range(0,21):
         parsed_res = get_job_last_parsed_time(af_job_id)
         last_parsed_time = parsed_res['data']['last_parsed_time']
-        if last_parsed_time and int(last_parsed_time) > current_time:
+        if last_parsed_time and last_parsed_time != current_time:
             res = send_execute(af_job_id)
             print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
             return res