Selaa lähdekoodia

数据源关系、存执删逻辑修改

liweiquan 2 vuotta sitten
vanhempi
commit
7aeb86fe58

+ 15 - 52
app/crud/jm_homework.py

@@ -4,35 +4,14 @@ from app import models, schemas
 from sqlalchemy.orm import Session
 from app.crud.constant import find_and_update
 
-from app.crud.jm_homework_datasource_relation import create_jm_hd_relation, delete_jm_relations, get_jm_relations
-from app.services.jm_job import jm_homework_submit
+from app.crud.jm_homework_datasource_relation import get_jm_relations
 
 
-def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
-    jm_homework_create = item.dict()
-    db_item = db.query(models.JmHomework).filter(models.JmHomework.name == jm_homework_create['name'])\
-        .filter(models.JmHomework.status != 0).first()
-    if db_item:
-        raise Exception('作业名称已存在')
-    relation_list = []
-    if 'relation_list' in jm_homework_create.keys():
-        relation_list = jm_homework_create.pop('relation_list')
-    tag = jm_homework_create['tag']
-    find_and_update(db, '作业标签', tag)
-    create_time: int = int(time.time())
-    db_item = models.JmHomework(**jm_homework_create,**{
-        'create_time': create_time,
-        'update_time': create_time,
-        'status': 1
-    })
-    db.add(db_item)
+def create_jm_homework(db: Session, item: models.JmHomework):
+    db.add(item)
     db.commit()
-    db.refresh(db_item)
-    if jm_homework_create['type'] == 'Dag' and relation_list is not None:
-        for relation in relation_list:
-            create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
-    jm_homework_submit(db_item, db)
-    return db_item.to_dict()
+    db.refresh(item)
+    return item
 
 def get_jm_homeworks(db: Session, project_id: str):
     res: List[models.JmHomework] = db.query(models.JmHomework)\
@@ -49,34 +28,18 @@ def get_jm_homework_info(db: Session, homework_id: int):
         item.__dict__.update({"hd_relation":relations})
     return item
 
-def update_jm_homework(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
-    jm_homework_update =update_item.dict(exclude_unset=True)
-    db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
-    if not db_item:
-        raise Exception('未找到该作业')
-    db_name_item = db.query(models.JmHomework)\
-        .filter(models.JmHomework.name == jm_homework_update['name'])\
-        .filter(models.JmHomework.status != 0)\
-        .filter(models.JmHomework.id != id).first()
-    if db_name_item:
-        raise Exception('作业名称已存在')
-    relation_list = []
-    if 'relation_list' in jm_homework_update.keys():
-        relation_list = jm_homework_update.pop('relation_list')
-    tag = jm_homework_update['tag']
-    find_and_update(db, '作业标签', tag)
-    for k, v in jm_homework_update.items():
-        setattr(db_item, k, v)
-    db_item.update_time = int(time.time())
+
+def get_jm_homework_by_name(db: Session, name: str):
+    db_item = db.query(models.JmHomework).filter(models.JmHomework.name == name)\
+        .filter(models.JmHomework.status != 0).first()
+    return db_item
+
+
+def update_jm_homework(db: Session, id: int, update_item: models.JmHomework):
     db.commit()
     db.flush()
-    db.refresh(db_item)
-    delete_jm_relations(db,db_item.id)
-    if jm_homework_update['type'] == 'Dag' and relation_list is not None:
-        for relation in relation_list:
-            create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
-    jm_homework_submit(db_item, db)
-    return db_item.to_dict()
+    db.refresh(update_item)
+    return update_item
 
 def delete_jm_homework(db: Session, id: int):
     db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()

+ 8 - 61
app/crud/jm_job_info.py

@@ -3,37 +3,11 @@ from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
 
-from app.crud.constant import find_and_update
-from app.utils.cron_utils import *
-from app.services.jm_job import jm_job_submit
-
-def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
-    jm_job_info_create = item.dict()
-    cron_expression_item = jm_job_info_create.pop('cron_expression', None)
-    if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
-        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
-        cron_select_type = cron_expression_item["cron_select_type"]
-        jm_job_info_create.update({
-            'cron_select_type': cron_select_type,
-            'cron_expression': cron_expression,
-        })
-    nodes = jm_job_info_create.pop('nodes', None)
-    edges = jm_job_info_create.pop('edges', None)
-    db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
-        .filter(models.JmJobInfo.delete_status != 0).first()
-    if db_item:
-        raise Exception('定时任务名称已存在')
-    tag = jm_job_info_create['tag']
-    find_and_update(db, '任务标签', tag)
-    jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
-        'status': 0,
-        'delete_status': 1,
-    })
+def create_jm_job_info(db: Session, jm_job_info: models.JmJobInfo):
     db.add(jm_job_info)
     db.commit()
     db.refresh(jm_job_info)
-    jm_job_submit(jm_job_info, db)
-    return jm_job_info,nodes,edges
+    return jm_job_info
 
 def get_jm_job_infos(db: Session):
     res: List[models.JmJobInfo] = db.query(models.JmJobInfo)\
@@ -49,37 +23,11 @@ def get_jm_job_info(db: Session, jm_job_id: int):
         raise Exception('未找到该定时任务')
     return item
 
-def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
-    jm_job_info_update = item.dict(exclude_unset=True)
-    cron_expression_item = jm_job_info_update.pop('cron_expression', None)
-    if jm_job_info_update['cron_type'] == 2:
-        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
-        cron_select_type = cron_expression_item["cron_select_type"]
-        jm_job_info_update.update({
-            'cron_select_type': cron_select_type,
-            'cron_expression': cron_expression,
-        })
-    nodes = jm_job_info_update.pop('nodes', None)
-    edges = jm_job_info_update.pop('edges', None)
-    db_item = db.query(models.JmJobInfo)\
-        .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
-    if not db_item:
-        raise Exception('未找到该定时任务')
-    db_name_item = db.query(models.JmJobInfo)\
-        .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
-        .filter(models.JmJobInfo.delete_status != 0)\
-        .filter(models.JmJobInfo.id != item.id).first()
-    if db_name_item:
-        raise Exception('定时任务名称已存在')
-    tag = jm_job_info_update['tag']
-    find_and_update(db, '任务标签', tag)
-    for k, v in jm_job_info_update.items():
-        setattr(db_item, k, v)
+def update_jm_job_info(db: Session, item: models.JmJobInfo):
     db.commit()
     db.flush()
-    db.refresh(db_item)
-    jm_job_submit(db_item,db)
-    return db_item,nodes,edges
+    db.refresh(item)
+    return item
 
 def delete_jm_job_info(db: Session, jm_job_id: int):
     jm_job_info = db.query(models.JmJobInfo)\
@@ -94,14 +42,13 @@ def delete_jm_job_info(db: Session, jm_job_id: int):
     db.refresh(jm_job_info)
     return jm_job_info
 
-def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
+def update_jm_job_status(db: Session, jm_job_id: int, status: int):
     jm_job_info = db.query(models.JmJobInfo)\
-        .filter(models.JmJobInfo.id == item.id)\
+        .filter(models.JmJobInfo.id == jm_job_id)\
         .filter(models.JmJobInfo.delete_status != 0).first()
     if not jm_job_info:
         raise Exception('未找到该定时任务')
-    jm_job_info.status = item.status
-    jm_job_submit(jm_job_info,db)
+    jm_job_info.status = status
     db.commit()
     db.flush()
     db.refresh(jm_job_info)

+ 8 - 63
app/crud/job_info.py

@@ -2,80 +2,25 @@ import time
 from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
-
+from app.services.datax import datax_create_job
 from app.utils.cron_utils import *
 
 
-def create_job_info(db: Session, item: schemas.JobInfoCreate):
-    create_time: int = int(time.time())
-    item_dict = item.dict()
-    cron_expression_dict = item_dict.pop('cron_expression')
-    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
-    cron_select_type = cron_expression_dict["cron_select_type"]
-    item_dict.update({
-        'cron_select_type': cron_select_type,
-        'job_cron': cron_expression,
-    })
-    partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() and item_dict['partition_info'] != '' else None
-    partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() and item_dict['partition_time'] != '' else None
-    partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() and item_dict['partition_num'] != '' else None
-    partition_info_str = ''
-    if partition_info is not None and partition_time is not None and partition_num is not None:
-        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
-    elif partition_info is not None and (partition_time is None or partition_num is None):
-        raise Exception('分区信息不完善')
-    item_dict.update({
-        'partition_info': partition_info_str,
-    })
-    db_item = models.JobInfo(**item_dict, **{
-        'trigger_status': 0,
-        'create_time': create_time,
-        'update_time': create_time,
-        'delete_status': 1,
-    })
-    db.add(db_item)
+def create_job_info(db: Session, item: models.JobInfo):
+    db.add(item)
     db.commit()
-    db.refresh(db_item)
-    return db_item
-
-
+    db.refresh(item)
+    return item
 
 def get_job_infos(db: Session):
     res: List[models.JobInfo] = db.query(models.JobInfo).filter(models.JobInfo.delete_status == 1).all()  # TODO: 排序
     return res
 
-
-
-def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate):
-    db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
-    if not db_item:
-        raise Exception('未找到该任务')
-    update_dict = update_item.dict(exclude_unset=True)
-    cron_expression_dict = update_dict.pop('cron_expression')
-    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
-    cron_select_type = cron_expression_dict["cron_select_type"]
-    update_dict.update({
-        'cron_select_type': cron_select_type,
-        'job_cron': cron_expression,
-    })
-    partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() and update_dict['partition_info'] != '' else None
-    partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys()  and update_dict['partition_time'] != '' else None
-    partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys()  and update_dict['partition_num'] != '' else None
-    partition_info_str = ''
-    if partition_info is not None and partition_time is not None and partition_num is not None:
-        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
-    elif partition_info is not None and (partition_time is None or partition_num is None):
-        raise Exception('分区信息不完善')
-    update_dict.update({
-        'partition_info': partition_info_str,
-    })
-    for k, v in update_dict.items():
-        setattr(db_item, k, v)
-    db_item.update_time = int(time.time())
+def update_job_info(db: Session, id: int, update_item: models.JobInfo):
     db.commit()
     db.flush()
-    db.refresh(db_item)
-    return db_item
+    db.refresh(update_item)
+    return update_item
 
 def update_job_trigger_status(db: Session, id: int, trigger_status: int):
     db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()

+ 1 - 1
app/routers/data_management.py

@@ -29,7 +29,7 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)):
-
+    current_time = int(time.time())
     table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
     tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
     af_run_id = data_transfer_run(tmp_table_name, table_name)

+ 3 - 5
app/routers/jm_homework.py

@@ -7,6 +7,7 @@ from sqlalchemy.orm import Session
 from app import schemas
 from app.common.hive import hiveDs
 from app.crud import jm_homework
+from app.services.jm_homework import create_jm_homework_services, update_jm_homework_service
 from app.services.jm_job import red_dag_and_format
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
@@ -27,9 +28,7 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def create_jm_homework(item: schemas.JmHomeworkCreate, db: Session = Depends(get_db)):
-    # 根据获取到的文件路径另存一份并改变
-    jm_homework = crud.create_jm_homework(db, item)
-    return jm_homework
+    return create_jm_homework_services(db, item)
 
 @router.get("/")
 @web_try()
@@ -47,8 +46,7 @@ def get_jm_homework_info(homework_id: str, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def update_jm_homework(jm_id: int, update_item: schemas.JmHomeworkUpdate, db: Session = Depends(get_db)):
-    # 根据获取到的文件路径另存一份并改变
-    return crud.update_jm_homework(db, jm_id, update_item)
+    return update_jm_homework_service(db, jm_id, update_item)
 
 @router.delete("/")
 @web_try()

+ 7 - 40
app/routers/jm_job_info.py

@@ -9,6 +9,8 @@ from app import models, schemas
 
 import app.crud as crud
 from app.schemas import cron_expression
+from app.services.jm_job import on_off_control
+from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, update_jm_job_info_services, update_jm_job_status_services
 from app.utils.cron_utils import *
 from app.utils.send_util import send_delete, send_execute
 from utils.sx_time import sxtimeit
@@ -28,37 +30,8 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
-    jm_job_info,nodes,edges = crud.create_jm_job_info(db, item)
-    job_id = jm_job_info.id
-    create_jm_job_node(db, nodes, edges, job_id)
-    return jm_job_info.to_dict()
-
-
-def create_jm_job_node(db: Session, nodes, edges, job_id):
-    uuid_node_id = {}
-    if nodes is None or len(nodes) == 0:
-        return
-    for node in nodes:
-        uuid = node['id']
-        node_item = models.JmJobNode(**{
-            'job_id': job_id,
-            'homework_id': node['homework_id'],
-            'homework_name': node['homework_name'],
-            'start_point': 1,
-        })
-        node_item = crud.create_jm_job_node(db,node_item)
-        node_id = node_item.id
-        uuid_node_id.update({uuid:node_id})
-    if nodes is None or len(nodes) == 0:
-        return
-    for edge in edges:
-        edge_item = models.JmJobEdge(**{
-            'job_id': job_id,
-            'in_node_id': uuid_node_id[edge['source']],
-            'out_node_id': uuid_node_id[edge['target']]
-        })
-        edge = crud.create_jm_job_edge(db,edge_item)
-    return
+    return create_jm_job_info_services(db, item)
+
 
 @router.get("/")
 @web_try()
@@ -126,12 +99,7 @@ def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
-    jm_job_info,nodes,edges = crud.update_jm_job_info(db, item)
-    job_id = jm_job_info.id
-    crud.delete_job_node(db, job_id)
-    job_id = jm_job_info.id
-    create_jm_job_node(db, nodes, edges, job_id)
-    return jm_job_info.to_dict()
+    return update_jm_job_info_services(db, item)
 
 @router.delete("/")
 @web_try()
@@ -145,7 +113,7 @@ def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
-    return crud.update_jm_job_status(db,item)
+    return update_jm_job_status_services(db, item.id, item.status)
 
 @router.post("/execute/{jm_job_id}")
 @web_try()
@@ -154,8 +122,7 @@ def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
     jm_job = crud.get_jm_job_info(db,jm_job_id)
     if jm_job.status == 0:
         raise Exception('任务已被停用')
-    relation = crud.get_af_id(db, jm_job_id, 'job')
-    res = send_execute(relation.af_id)
+    res = execute_job_services(db,jm_job_id)
     return res['data']
 
 

+ 6 - 14
app/routers/job_info.py

@@ -9,7 +9,8 @@ from app import models, page_help, schemas
 
 import app.crud as crud
 from app.crud import job_info
-from app.services.datax import datax_create_job, datax_update_job
+from app.services.datax import datax_create_job, datax_update_job, on_off_control
+from app.services.job_info import create_job_info_services, execute_job_services, update_job_info_services
 from app.utils.cron_utils import parsing_cron_expression
 from app.utils.send_util import *
 from app.utils.utils import *
@@ -32,9 +33,7 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)):
-    return crud.create_job_info(db, item)
-
-
+    return create_job_info_services(db,item)
 
 @router.get("/")
 @web_try()
@@ -68,14 +67,11 @@ def get_job_info(job_id: int, db: Session = Depends(get_db)):
         })
     return job_info_dict
 
-
-
 @router.put("/{id}")
 @web_try()
 @sxtimeit
 def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
-    job_info = crud.update_job_info(db, id, update_item)
-    return job_info
+    return update_job_info_services(db, id, update_item)
 
 @router.put("/update_trigger_status/")
 @web_try()
@@ -84,10 +80,7 @@ def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depe
     job_info = crud.get_job_info(db, item.id)
     relation = crud.get_af_id(db, job_info.id, 'datax')
     job_info.trigger_status = item.trigger_status
-    if not relation:
-        datax_create_job(job_info,db)
-    else:
-        datax_update_job(job_info,db)
+    on_off_control(relation.af_id, item.trigger_status)
     job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status)
     return job_info
 
@@ -106,8 +99,7 @@ def execute_job_info(job_id: int, db: Session = Depends(get_db)):
     jm_job = crud.get_job_info(db, job_id)
     if jm_job.trigger_status == 0:
         raise Exception('任务已被停用')
-    relation = crud.get_af_id(db, job_id, 'datax')
-    res = send_execute(relation.af_id)
+    res = execute_job_services(db, job_id)
     return res['data']
 
 add_pagination(router)

+ 16 - 14
app/services/datax.py

@@ -25,9 +25,8 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
     }
     res = send_post('/af/af_job', af_job)
     af_job = res['data']
-    crud.create_relation(db, job_info.id,'datax', af_job['id'])
     send_submit(af_job['id'])
-    on_off_control(af_job['id'], job_info.trigger_status)
+    return af_job
 
 def datax_create_task(job_info: models.JobInfo):
     cmd_parameter = get_cmd_parameter(job_info.jvm_param)
@@ -60,7 +59,6 @@ def datax_create_task(job_info: models.JobInfo):
     af_task = res['data']
     return af_task
 
-
 def datax_update_job(job_info: models.JobInfo, db: Session):
     relation = crud.get_af_id(db, job_info.id, 'datax')
     af_job_id = relation.af_id
@@ -85,8 +83,7 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     res = send_put('/af/af_job', old_af_job['id'], af_job)
     af_job = res['data']
     send_submit(af_job['id'])
-    on_off_control(af_job['id'], job_info.trigger_status)
-
+    return af_job
 
 def datax_put_task(job_info: models.JobInfo,old_af_task):
     cmd_parameter = get_cmd_parameter(job_info.jvm_param)
@@ -117,15 +114,6 @@ def datax_put_task(job_info: models.JobInfo,old_af_task):
     af_task = res['data']
     return af_task
 
-
-def datax_job_submit(job_info: models.JobInfo, db: Session):
-    relation = crud.get_af_id(db, job_info.id, 'datax')
-    if not relation:
-        datax_create_job(job_info,db)
-    else:
-        datax_update_job(job_info,db)
-
-
 def on_off_control(af_job_id: int,status: int):
     for i in range(0,11):
         parsed_res = get_job_last_parsed_time(af_job_id)
@@ -136,4 +124,18 @@ def on_off_control(af_job_id: int,status: int):
             break
         if i >= 10:
             raise Exception(f"{af_job_id}==>状态修改失败")
+        time.sleep(2)
+
+def execute_job(af_job_id: int):
+    current_time = int(time.time())
+    send_submit(af_job_id)
+    for i in range(0,21):
+        parsed_res = get_job_last_parsed_time(af_job_id)
+        last_parsed_time = parsed_res['data']['last_parsed_time']
+        if last_parsed_time and int(last_parsed_time) > current_time:
+            res = send_execute(af_job_id)
+            print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
+            return res
+        if i >= 20:
+            raise Exception(f"{af_job_id}==>文件正在转化中")
         time.sleep(2)

+ 63 - 0
app/services/jm_homework.py

@@ -0,0 +1,63 @@
+import time
+from app import models, schemas
+from sqlalchemy.orm import Session
+import app.crud as crud
+from app.services.jm_job import jm_job_create_task, jm_job_update_task
+
+def create_jm_homework_services(db: Session, item: schemas.JmHomeworkCreate):
+    jm_homework_create = item.dict()
+    db_item = crud.get_jm_homework_by_name(db,jm_homework_create['name'])
+    if db_item:
+        raise Exception('作业名称已存在')
+    relation_list = []
+    if 'relation_list' in jm_homework_create.keys():
+        relation_list = jm_homework_create.pop('relation_list')
+    tag = jm_homework_create['tag']
+    crud.find_and_update(db, '作业标签', tag)
+    create_time: int = int(time.time())
+    db_item = models.JmHomework(**jm_homework_create,**{
+        'create_time': create_time,
+        'update_time': create_time,
+        'status': 1
+    })
+    # 创建airflow端作业
+    af_task = jm_job_create_task(db_item, relation_list, db)
+    # 创建local作业
+    db_item = crud.create_jm_homework(db,db_item)
+    # 若作业为dag类型并存在数据源关系,则新建数据源关系
+    if jm_homework_create['type'] == 'Dag' and relation_list is not None:
+        for relation in relation_list:
+            crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
+    # 创建关系表
+    crud.create_relation(db ,db_item.id, 'task', af_task['id'])
+    return db_item.to_dict()
+
+def update_jm_homework_service(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
+    jm_homework_update =update_item.dict(exclude_unset=True)
+    db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
+    if not db_item:
+        raise Exception('未找到该作业')
+    db_name_item = db.query(models.JmHomework)\
+        .filter(models.JmHomework.name == jm_homework_update['name'])\
+        .filter(models.JmHomework.status != 0)\
+        .filter(models.JmHomework.id != id).first()
+    if db_name_item:
+        raise Exception('作业名称已存在')
+    relation_list = []
+    if 'relation_list' in jm_homework_update.keys():
+        relation_list = jm_homework_update.pop('relation_list')
+    tag = jm_homework_update['tag']
+    crud.find_and_update(db, '作业标签', tag)
+    for k, v in jm_homework_update.items():
+        setattr(db_item, k, v)
+    db_item.update_time = int(time.time())
+    # 修改airflow端作业
+    af_task = jm_job_update_task(db_item, relation_list, db)
+    # 修改local作业
+    db_item = crud.update_jm_homework(db, id, db_item)
+    # 数据源关系修改
+    crud.delete_jm_relations(db,db_item.id)
+    if jm_homework_update['type'] == 'Dag' and relation_list is not None:
+        for relation in relation_list:
+            crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
+    return db_item.to_dict()

+ 32 - 44
app/services/jm_job.py

@@ -2,7 +2,7 @@ from asyncio import current_task
 import json
 import time
 from turtle import update
-from app import crud, models
+from app import crud, models, schemas
 from app.common import minio
 from app.core.datasource.datasource import DataSourceBase
 from app.crud.jm_homework_datasource_relation import get_jm_relations
@@ -18,10 +18,10 @@ type_dict = {
     "Dag": "sparks"
 }
 
-def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
+def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Session):
     content = ''
     if jm_homework.type == "Dag":
-        content = red_dag_and_format(jm_homework, db)
+        content = red_dag_and_format(jm_homework, relation_list, db)
     elif jm_homework.type == "Python":
         content = red_python_and_format(jm_homework)
     af_task = {
@@ -37,14 +37,13 @@ def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
     }
     res = send_post('/af/af_task', af_task)
     af_task = res['data']
-    crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
     return af_task
 
-def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
+def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Session):
     relation = crud.get_af_id(db, jm_homework.id, 'task')
     content = ''
     if jm_homework.type == "Dag":
-        content = content = red_dag_and_format(jm_homework, db)
+        content = content = red_dag_and_format(jm_homework, relation_list, db)
     elif jm_homework.type == "Python":
         content = red_python_and_format(jm_homework)
     af_task = {
@@ -62,21 +61,12 @@ def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
     af_task = res['data']
     return af_task
 
-def jm_homework_submit(jm_homework: models.JmHomework, db: Session):
-    task_relation = crud.get_af_id(db,jm_homework.id,'task')
-    if task_relation is None:
-        jm_job_create_task(jm_homework, db)
-    else:
-        jm_job_update_task(jm_homework, db)
-
-def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
-    nodes = crud.get_one_job_nodes(db, jm_job_info.id)
-    homework_ids = [node.homework_id for node in nodes]
+def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
+    homework_ids = [node['homework_id'] for node in nodes]
     relations = crud.get_af_ids(db,homework_ids, 'task')
     se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
-    edges = crud.get_one_job_edges(db, jm_job_info.id)
-    dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
+    dependence = [[se_id_to_af_id_dict[edge['source']],se_id_to_af_id_dict[str(edge['target'])]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
     cron.replace('?','*')
     af_job = {
@@ -96,20 +86,16 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
     }
     res = send_post('/af/af_job', af_job)
     af_job = res['data']
-    crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
     send_submit(af_job['id'])
-    # on_off_control(af_job['id'],jm_job_info.status)
+    return af_job
 
 
-def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
-    nodes = crud.get_one_job_nodes(db, jm_job_info.id)
-    homework_ids = [node.homework_id for node in nodes]
-    node_id_to_h_id = {node.id:node.homework_id for node in nodes}
+def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
+    homework_ids = [node['homework_id'] for node in nodes]
     relations = crud.get_af_ids(db,homework_ids, 'task')
     se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
-    edges = crud.get_one_job_edges(db, jm_job_info.id)
-    dependence = [[se_id_to_af_id_dict[node_id_to_h_id[edge.in_node_id]],se_id_to_af_id_dict[node_id_to_h_id[edge.out_node_id]]] for edge in edges]
+    dependence = [[se_id_to_af_id_dict[edge['source']],se_id_to_af_id_dict[str(edge['target'])]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
     cron.replace('?','*')
     af_job = {
@@ -128,20 +114,9 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
     res = send_put('/af/af_job', job_relation.af_id, af_job)
     af_job = res['data']
     send_submit(af_job['id'])
-    on_off_control(af_job['id'],jm_job_info.status)
-
-
-def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
-    job_relation = crud.get_af_id(db,jm_job_info.id,'job')
-    if job_relation is None:
-        jm_job_create_job(jm_job_info, db)
-    else:
-        jm_job_update_job(jm_job_info, db)
-
 
-def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
-    relations = get_jm_relations(db,jm_homework.id)
-    node_relation_dict = { relation.node_uuid:relation for relation in relations}
+def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Session):
+    node_relation_dict = { relation['node_uuid']:relation for relation in relation_list}
     file = minio_client.get_file(jm_homework.dag_url)
     result = json.loads(file)
     edges = result['edges']
@@ -161,14 +136,14 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             for filed in fileds:
                 script += filed['dataField'] + ','
             script = script.strip(',')
-            ds_id = node_relation_dict[node['id']].datasource_id
+            ds_id = node_relation_dict[node['id']]['datasource_id']
             database_name = ""
             if ds_id  == -1:
                 database_name = DATABASE_NAME
             else:
                 data_source = crud.get_job_jdbc_datasource(db,ds_id)
                 database_name = data_source.database_name
-            script += ' from ' + database_name + '.'+node_relation_dict[node['id']].table
+            script += ' from ' + database_name + '.'+node_relation_dict[node['id']]['table']
             sub_node = {
                 "id": node['id'],
                 "name": node['name'],
@@ -178,7 +153,7 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             sub_nodes.append(sub_node)
         elif node['op'] == 'outputsource':
             fileds = node['data']['output_source']
-            ds_id = node_relation_dict[node['id']].datasource_id
+            ds_id = node_relation_dict[node['id']]['datasource_id']
             database_name = ""
             if ds_id  == -1:
                 database_name = DATABASE_NAME
@@ -186,7 +161,7 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
                 data_source = crud.get_job_jdbc_datasource(db,ds_id)
                 database_name = data_source.database_name
             script = '''def main_func (input0, spark,sc):
-    input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']].table+'''")'''
+    input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']]['table']+'''")'''
             inputs = {}
             index = 0
             input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
@@ -234,7 +209,6 @@ def red_python_and_format(jm_homework):
     file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
     return file.decode("utf-8")
 
-
 def on_off_control(af_job_id: int,status: int):
     for i in range(0,11):
         parsed_res = get_job_last_parsed_time(af_job_id)
@@ -245,4 +219,18 @@ def on_off_control(af_job_id: int,status: int):
             break
         if i >= 10:
             raise Exception(f"{af_job_id}==>执行失败")
+        time.sleep(2)
+
+def execute_job(af_job_id: int):
+    current_time = int(time.time())
+    send_submit(af_job_id)
+    for i in range(0,21):
+        parsed_res = get_job_last_parsed_time(af_job_id)
+        last_parsed_time = parsed_res['data']['last_parsed_time']
+        if last_parsed_time and int(last_parsed_time) > current_time:
+            res = send_execute(af_job_id)
+            print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
+            return res
+        if i >= 20:
+            raise Exception(f"{af_job_id}==>文件正在转化中")
         time.sleep(2)

+ 118 - 0
app/services/jm_job_info.py

@@ -0,0 +1,118 @@
+from sqlalchemy.orm import Session
+from app import models, schemas
+from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
+from app.utils.cron_utils import joint_cron_expression
+import app.crud as crud
+
+def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
+    jm_job_info_create = item.dict()
+    # 定时对象转为cron表达式
+    cron_expression_item = jm_job_info_create.pop('cron_expression', None)
+    if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
+        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
+        cron_select_type = cron_expression_item["cron_select_type"]
+        jm_job_info_create.update({
+            'cron_select_type': cron_select_type,
+            'cron_expression': cron_expression,
+        })
+    # 节点与边的剥离
+    nodes = jm_job_info_create.pop('nodes', None)
+    edges = jm_job_info_create.pop('edges', None)
+    db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
+        .filter(models.JmJobInfo.delete_status != 0).first()
+    if db_item:
+        raise Exception('定时任务名称已存在')
+    # 标签的存储
+    tag = jm_job_info_create['tag']
+    crud.find_and_update(db, '任务标签', tag)
+    jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
+        'status': 0,
+        'delete_status': 1,
+    })
+    # 创建airflow端任务
+    af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
+    # 创建local端任务
+    jm_job_info = crud.create_jm_job_info(db,jm_job_info)
+    # 创建多作业节点与节点关系
+    create_jm_job_node(db, nodes, edges, jm_job_info.id)
+    # 创建关系
+    crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
+    return jm_job_info
+
+
+def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
+    jm_job_info_update = item.dict(exclude_unset=True)
+    # 定时对象转为cron表达式
+    cron_expression_item = jm_job_info_update.pop('cron_expression', None)
+    if jm_job_info_update['cron_type'] == 2:
+        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
+        cron_select_type = cron_expression_item["cron_select_type"]
+        jm_job_info_update.update({
+            'cron_select_type': cron_select_type,
+            'cron_expression': cron_expression,
+        })
+    # 节点与边的剥离
+    nodes = jm_job_info_update.pop('nodes', None)
+    edges = jm_job_info_update.pop('edges', None)
+    db_item = db.query(models.JmJobInfo)\
+        .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
+    if not db_item:
+        raise Exception('未找到该定时任务')
+    db_name_item = db.query(models.JmJobInfo)\
+        .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
+        .filter(models.JmJobInfo.delete_status != 0)\
+        .filter(models.JmJobInfo.id != item.id).first()
+    if db_name_item:
+        raise Exception('定时任务名称已存在')
+    # 标签的存储
+    tag = jm_job_info_update['tag']
+    crud.find_and_update(db, '任务标签', tag)
+    for k, v in jm_job_info_update.items():
+        setattr(db_item, k, v)
+    # 修改airflow端任务
+    af_job = jm_job_update_job(db_item,nodes,edges,db)
+    # 修改local端任务
+    db_item = crud.update_jm_job_info(db,db_item)
+    # 删除之前的作业节点并创建新作业节点与节点关系
+    crud.delete_job_node(db, db_item.id)
+    create_jm_job_node(db, nodes, edges, db_item.id)
+    return db_item
+
+
+def create_jm_job_node(db: Session, nodes, edges, job_id):
+    uuid_node_id = {}
+    if nodes is None or len(nodes) == 0:
+        return
+    for node in nodes:
+        uuid = node['id']
+        node_item = models.JmJobNode(**{
+            'job_id': job_id,
+            'homework_id': node['homework_id'],
+            'homework_name': node['homework_name'],
+            'start_point': 1,
+        })
+        node_item = crud.create_jm_job_node(db,node_item)
+        node_id = node_item.id
+        uuid_node_id.update({uuid:node_id})
+    if nodes is None or len(nodes) == 0:
+        return
+    for edge in edges:
+        edge_item = models.JmJobEdge(**{
+            'job_id': job_id,
+            'in_node_id': uuid_node_id[edge['source']],
+            'out_node_id': uuid_node_id[edge['target']]
+        })
+        edge = crud.create_jm_job_edge(db,edge_item)
+    return
+
+
+def update_jm_job_status_services(db: Session, job_id: int, status: int):
+    job_relation = crud.get_af_id(db,job_id,'job')
+    on_off_control(job_relation.af_id, status)
+    return crud.update_jm_job_status(db,job_id,status)
+
+def execute_job_services(db: Session, jm_job_id: int):
+    relation = crud.get_af_id(db, jm_job_id, 'job')
+    res = execute_job(relation.af_id)
+    return res
+

+ 80 - 0
app/services/job_info.py

@@ -0,0 +1,80 @@
+import time
+from app import models, schemas
+from app.services.datax import datax_create_job, datax_update_job, execute_job
+from app.utils.cron_utils import joint_cron_expression
+from sqlalchemy.orm import Session
+import app.crud as crud
+
+def create_job_info_services(db: Session, item: schemas.JobInfoCreate):
+    create_time: int = int(time.time())
+    item_dict = item.dict()
+    # 定时任务对象转为cron表达式
+    cron_expression_dict = item_dict.pop('cron_expression')
+    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
+    cron_select_type = cron_expression_dict["cron_select_type"]
+    item_dict.update({
+        'cron_select_type': cron_select_type,
+        'job_cron': cron_expression,
+    })
+    # 分区信息拼接
+    partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() and item_dict['partition_info'] != '' else None
+    partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() and item_dict['partition_time'] != '' else None
+    partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() and item_dict['partition_num'] != '' else None
+    partition_info_str = ''
+    if partition_info is not None and partition_time is not None and partition_num is not None:
+        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
+    elif partition_info is not None and (partition_time is None or partition_num is None):
+        raise Exception('分区信息不完善')
+    item_dict.update({
+        'partition_info': partition_info_str,
+    })
+    db_item = models.JobInfo(**item_dict, **{
+        'trigger_status': 0,
+        'create_time': create_time,
+        'update_time': create_time,
+        'delete_status': 1,
+    })
+    # 创建airflow端同步任务
+    af_job = datax_create_job(db_item, db)
+    # 创建本地同步任务
+    db_item = crud.create_job_info(db, db_item)
+    crud.create_relation(db, db_item.id,'datax', af_job['id'])
+    return db_item
+
+
+def update_job_info_services(db: Session, id: int, update_item: schemas.JobInfoUpdate):
+    # 获取任务信息
+    db_item = crud.get_job_info(db,id)
+    update_dict = update_item.dict(exclude_unset=True)
+    # 定时任务对象转为cron表达式
+    cron_expression_dict = update_dict.pop('cron_expression')
+    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
+    cron_select_type = cron_expression_dict["cron_select_type"]
+    update_dict.update({
+        'cron_select_type': cron_select_type,
+        'job_cron': cron_expression,
+    })
+    # 分区信息拼接
+    partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() and update_dict['partition_info'] != '' else None
+    partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys()  and update_dict['partition_time'] != '' else None
+    partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys()  and update_dict['partition_num'] != '' else None
+    partition_info_str = ''
+    if partition_info is not None and partition_time is not None and partition_num is not None:
+        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
+    elif partition_info is not None and (partition_time is None or partition_num is None):
+        raise Exception('分区信息不完善')
+    update_dict.update({
+        'partition_info': partition_info_str,
+    })
+    for k, v in update_dict.items():
+        setattr(db_item, k, v)
+    db_item.update_time = int(time.time())
+    # 修改airflow端同步任务
+    af_job = datax_update_job(db_item, db)
+    crud.update_job_info(db,id,db_item)
+    return db_item
+
+def execute_job_services(db: Session, job_id: int):
+    relation = crud.get_af_id(db, job_id, 'datax')
+    res = execute_job(relation.af_id)
+    return res