瀏覽代碼

多作业,起停,删除

liweiquan 2 年之前
父節點
當前提交
15acb7b998
共有 6 個文件被更改,包括 44 次插入8 次删除
  1. 1 1
      app/crud/jm_job_info.py
  2. 3 1
      app/routers/jm_job_info.py
  3. 2 0
      app/routers/job_info.py
  4. 8 2
      app/services/datax.py
  5. 10 3
      app/services/jm_job.py
  6. 20 1
      app/utils/send_util.py

+ 1 - 1
app/crud/jm_job_info.py

@@ -75,10 +75,10 @@ def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
     find_and_update(db, '任务标签', tag)
     for k, v in jm_job_info_update.items():
         setattr(db_item, k, v)
-    jm_job_submit(db_item,db)
     db.commit()
     db.flush()
     db.refresh(db_item)
+    jm_job_submit(db_item,db)
     return db_item,nodes,edges
 
 def delete_jm_job_info(db: Session, jm_job_id: int):

+ 3 - 1
app/routers/jm_job_info.py

@@ -10,7 +10,7 @@ from app import models, schemas
 import app.crud as crud
 from app.schemas import cron_expression
 from app.utils.cron_utils import *
-from app.utils.send_util import send_execute
+from app.utils.send_util import send_delete, send_execute
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
@@ -136,6 +136,8 @@ def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_
 @web_try()
 @sxtimeit
 def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
+    relation = crud.get_af_id(db, jm_job_id, 'job')
+    send_delete('/jpt/af_job', relation.af_id)
     return crud.delete_jm_job_info(db,jm_job_id)
 
 @router.put("/status")

+ 2 - 0
app/routers/job_info.py

@@ -100,6 +100,8 @@ def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depe
 @web_try()
 @sxtimeit
 def delete_job_info(job_id: int, db: Session = Depends(get_db)):
+    relation = crud.get_af_id(db, job_id, 'datax')
+    send_delete('/jpt/af_job', relation.af_id)
     return crud.delete_job_info(db, job_id)
 
 @router.post("/execute")

+ 8 - 2
app/services/datax.py

@@ -5,9 +5,11 @@ from sqlalchemy.orm import Session
 
 def datax_create_job(job_info: models.JobInfo, db: Session):
     af_task = datax_create_task(job_info)
+    cron: str = job_info.job_cron
+    cron.replace('?','*')
     af_job = {
         "tasks": [af_task],
-        "name": job_info.job_desc,
+        "name": cron,
         "dependence": [],
         "cron": job_info.job_cron,
         "desc": job_info.job_desc,
@@ -24,6 +26,7 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
     af_job = res['data']
     crud.create_relation(db, job_info.id,'datax', af_job['id'])
     send_submit(af_job['id'])
+    send_pause(af_job['id'], True if job_info.trigger_status == 1 else False)
 
 def datax_create_task(job_info: models.JobInfo):
     cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
@@ -50,11 +53,13 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     old_af_job = res['data']
     old_af_task = old_af_job['tasks'][0]
     af_task = datax_put_task(job_info,old_af_task)
+    cron: str = job_info.job_cron
+    cron.replace('?','*')
     af_job = {
         "tasks": [af_task],
         "name": job_info.job_desc,
         "dependence": [],
-        "cron": job_info.job_cron,
+        "cron": cron,
         "desc": job_info.job_desc,
         "route_strategy": job_info.executor_route_strategy,
         "block_strategy": job_info.executor_block_strategy,
@@ -65,6 +70,7 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     res = send_put('/jpt/af_job', old_af_job['id'], af_job)
     af_job = res['data']
     send_submit(af_job['id'])
+    send_pause(af_job['id'], True if job_info.trigger_status == 1 else False)
 
 
 def datax_put_task(job_info: models.JobInfo,old_af_task):

+ 10 - 3
app/services/jm_job.py

@@ -72,11 +72,13 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
     tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
     edges = crud.get_one_job_edges(db, jm_job_info.id)
     dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
+    cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
+    cron.replace('?','*')
     af_job = {
         "tasks": tasks,
         "name": jm_job_info.name,
         "dependence": dependence,
-        "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@onec',
+        "cron": cron,
         "desc": jm_job_info.name,
         "route_strategy": "",
         "block_strategy": "",
@@ -91,21 +93,25 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
     af_job = res['data']
     crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
     send_submit(af_job['id'])
+    send_pause(af_job['id'], True if jm_job_info.status == 1 else False)
 
 
 def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
     nodes = crud.get_one_job_nodes(db, jm_job_info.id)
     homework_ids = [node.homework_id for node in nodes]
+    node_id_to_h_id = {node.id:node.homework_id for node in nodes}
     relations = crud.get_af_ids(db,homework_ids, 'task')
     se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
     tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
     edges = crud.get_one_job_edges(db, jm_job_info.id)
-    dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
+    dependence = [[se_id_to_af_id_dict[node_id_to_h_id[edge.in_node_id]],se_id_to_af_id_dict[node_id_to_h_id[edge.out_node_id]]] for edge in edges]
+    cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
+    cron.replace('?','*')
     af_job = {
         "tasks": tasks,
         "name": jm_job_info.name,
         "dependence": dependence,
-        "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@onec',
+        "cron": cron,
         "desc": jm_job_info.name,
         "route_strategy": "",
         "block_strategy": "",
@@ -117,6 +123,7 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
     res = send_put('/jpt/af_job', job_relation.af_id, af_job)
     af_job = res['data']
     send_submit(af_job['id'])
+    send_pause(af_job['id'], True if jm_job_info.status == 1 else False)
 
 def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
     job_relation = crud.get_af_id(db,jm_job_info.id,'job')

+ 20 - 1
app/utils/send_util.py

@@ -47,4 +47,23 @@ def send_execute(path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        raise Exception('执行一次任务,请求airflow失败-->'+result['msg'])
+        raise Exception('执行一次任务,请求airflow失败-->'+result['msg'])
+
+# 起停任务
+def send_pause(af_job_id, status):
+    res = requests.patch(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(af_job_id)}/pause/{str(status)}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        raise Exception('修改任务状态,请求airflow失败-->'+result['msg'])
+
+# 删除任务
+def send_delete(uri, path_data):
+    res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        print(result)
+        raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])