Quellcode durchsuchen

同步任务筛选

liweiquan vor 2 Jahren
Ursprung
Commit
4ddd46a032
4 geänderte Dateien mit 12 neuen und 7 gelöschten Zeilen
  1. 6 2
      app/routers/job_log.py
  2. 2 2
      app/services/datax.py
  3. 2 2
      app/services/jm_job.py
  4. 2 1
      app/utils/send_util.py

+ 6 - 2
app/routers/job_log.py

@@ -28,8 +28,12 @@ def create_job_log(item: schemas.JobLogCreate, db: Session = Depends(get_db)):
 @router.get("/")
 @web_try()
 @sxtimeit
-def get_job_logs(params: Params = Depends(), db: Session = Depends(get_db)):
-    job_infos = crud.get_job_infos(db)
+def get_job_logs(job_id: Optional[int] = None, params: Params = Depends(), db: Session = Depends(get_db)):
+    job_infos = []
+    if job_id is None:
+        job_infos = crud.get_job_infos(db)
+    else:
+        job_infos = [crud.get_job_info(db,job_id)]
     id_to_job = {job.id:job for job in job_infos}
     relations = crud.get_af_ids(db, id_to_job.keys(), 'datax')
     af_to_datax = {relation.af_id:relation.se_id for relation in relations}

+ 2 - 2
app/services/datax.py

@@ -26,7 +26,7 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
     af_job = res['data']
     crud.create_relation(db, job_info.id,'datax', af_job['id'])
     send_submit(af_job['id'])
-    send_pause(af_job['id'], True if job_info.trigger_status == 1 else False)
+    send_pause(af_job['id'], job_info.trigger_status)
 
 def datax_create_task(job_info: models.JobInfo):
     cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
@@ -70,7 +70,7 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     res = send_put('/jpt/af_job', old_af_job['id'], af_job)
     af_job = res['data']
     send_submit(af_job['id'])
-    send_pause(af_job['id'], True if job_info.trigger_status == 1 else False)
+    send_pause(af_job['id'], job_info.trigger_status)
 
 
 def datax_put_task(job_info: models.JobInfo,old_af_task):

+ 2 - 2
app/services/jm_job.py

@@ -93,7 +93,7 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
     af_job = res['data']
     crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
     send_submit(af_job['id'])
-    send_pause(af_job['id'], True if jm_job_info.status == 1 else False)
+    send_pause(af_job['id'], jm_job_info.status)
 
 
 def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
@@ -123,7 +123,7 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
     res = send_put('/jpt/af_job', job_relation.af_id, af_job)
     af_job = res['data']
     send_submit(af_job['id'])
-    send_pause(af_job['id'], True if jm_job_info.status == 1 else False)
+    send_pause(af_job['id'],jm_job_info.status)
 
 def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
     job_relation = crud.get_af_id(db,jm_job_info.id,'job')

+ 2 - 1
app/utils/send_util.py

@@ -51,7 +51,8 @@ def send_execute(path_data):
 
 # 起停任务
 def send_pause(af_job_id, status):
-    res = requests.patch(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(af_job_id)}/pause/{str(status)}')
+    flag = True if status == 0 else False
+    res = requests.patch(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(af_job_id)}/pause/{str(flag)}')
     result = res.json()
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()