浏览代码

merge remote

Zhang Li 2 年之前
父节点
当前提交
ad0c571df3

+ 38 - 1
app/crud/relation.py

@@ -3,6 +3,9 @@ from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
 
+from app.utils.send_util import get_running_status
+from constants.constants import RUN_STATUS
+
 def create_relation(db: Session, se_id: int, type: str, af_id: int):
     db_item = models.Relation(**{"se_id": se_id,
                                  "type": type,
@@ -21,6 +24,30 @@ def create_debug_relation(db: Session, dag_uuid: str, type: str, af_id: int):
     db.refresh(db_item)
     return db_item
 
+def create_or_update_requirements_relation(db: Session, dag_uuid: str, af_run_id: int, status: int):
+    relation: models.Relation = db.query(models.Relation).filter(models.Relation.type == "requirements")\
+        .filter(models.Relation.dag_uuid == dag_uuid).first()
+    if relation:
+        relation.af_run_id = af_run_id
+        relation.status = status
+        db.commit()
+        db.flush()
+    else:
+        relation = models.Relation(**{"dag_uuid": dag_uuid,
+                                 "type": "requirements",
+                                 "af_id": -1,
+                                 "af_run_id": af_run_id,
+                                 "status": status})
+        db.add(relation)
+        db.commit()
+    db.refresh(relation)
+    return relation
+
+def get_requirements_relation(db: Session, dag_uuid: str):
+    relation: models.Relation = db.query(models.Relation).filter(models.Relation.type == "requirements")\
+        .filter(models.Relation.dag_uuid == dag_uuid).first()
+    return relation
+
 def get_af_id(db: Session, se_id: int, type: str):
     res: models.Relation = db.query(models.Relation)\
         .filter(models.Relation.se_id == se_id)\
@@ -43,4 +70,14 @@ def delete_relation(db: Session, se_id: int, type: str):
     res: models.Relation = db.query(models.Relation)\
         .filter(models.Relation.se_id == se_id)\
         .filter(models.Relation.type == type).delete()
-    return res
+    return res
+
+def get_requirements_status(db: Session, dag_uuid: str):
+    relation = get_requirements_relation(db,dag_uuid)
+    if not relation:
+        return relation
+    af_run_id = relation.af_run_id
+    running_res = get_running_status(str(-1), af_run_id)
+    running_status = running_res['data']['status']
+    relation = create_or_update_requirements_relation(db,dag_uuid, af_run_id, RUN_STATUS[running_status])
+    return relation

+ 4 - 0
app/models/relation.py

@@ -15,3 +15,7 @@ class Relation(BaseModel):
     af_id = Column(Integer, nullable=False)
     # dag_uuid
     dag_uuid = Column(String)
+    # af_run_id
+    af_run_id = Column(String)
+    # 执行状态
+    status = Column(Integer)

+ 25 - 1
app/routers/dag.py

@@ -5,7 +5,7 @@ from fastapi import APIRouter
 from app.common.security.auth import verify_users
 from app.services.dag import dag_job_submit, get_tmp_table_name
 from app import crud, models, schemas
-from app.utils.send_util import get_job_run_status, get_task_log
+from app.utils.send_util import get_job_run_status, get_running_status, get_task_log, post_install_requirements
 from constants.constants import RUN_STATUS
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
@@ -26,6 +26,11 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def execute_dag(dag: schemas.Dag, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
+    relation = crud.get_requirements_status(db, dag.dag_uuid)
+    if relation and relation.status in [0,1]:
+        raise Exception('依赖正在安装中')
+    elif relation and relation.status in [3]:
+        raise Exception('依赖安装失败,请重新安装')
     af_job = dag_job_submit(dag.dag_uuid, dag.dag_script,db)
     return af_job
 
@@ -91,3 +96,22 @@ def get_dag_debug_result(dag_uuid: str, node_id: str, out_pin: int , token_data:
         })
     return result
 
+@router.post("/install_requirements")
+@web_try()
+@sxtimeit
+def install_requirements(item: schemas.Requirements, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
+    res = post_install_requirements(item.requirements, f'jpt/requirements/dag_{item.dag_uuid.lower()}')
+    af_run_id = res['data']['af_run_id']
+    running_res = get_running_status(str(-1), af_run_id)
+    running_status = running_res['data']['status']
+    relation = crud.create_or_update_requirements_relation(db,item.dag_uuid, af_run_id, RUN_STATUS[running_status])
+    return relation
+
+@router.get("/requirements_status")
+@web_try()
+@sxtimeit
+def get_requirements_status(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
+    relation = crud.get_requirements_status(db, dag_uuid)
+    if not relation:
+        return "暂无依赖"
+    return relation

+ 1 - 8
app/routers/jm_homework.py

@@ -49,11 +49,4 @@ def delete_jm_homework(jm_id: int, token_data: schemas.TokenData = Depends(verif
     res = crud.find_by_homework_and_job(db, jm_job_ids,jm_id)
     if len(res) > 0:
         raise Exception("该作业正在被定时任务使用,不可删除")
-    return crud.delete_jm_homework(db, jm_id)
-
-
-@router.get("/test")
-def get_test_dag(db: Session = Depends(get_db)):
-    jm_homework = crud.get_jm_homework_info(db, 83)
-    res = red_dag_and_format(jm_homework, db)
-    return res
+    return crud.delete_jm_homework(db, jm_id)

+ 4 - 19
app/routers/jm_job_info.py

@@ -1,8 +1,4 @@
 import base64
-from datetime import datetime
-from datetime import timedelta
-from datetime import timezone
-import croniter
 import re
 from typing import Optional, List
 from fastapi import APIRouter
@@ -13,7 +9,7 @@ from app.common.security.auth import verify_users
 import app.crud as crud
 from app.schemas import cron_expression
 from app.services.jm_job import on_off_control
-from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, update_jm_job_info_services, update_jm_job_status_services
+from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, get_requirements_status_by_job_id, update_jm_job_info_services, update_jm_job_status_services
 from app.utils.cron_utils import *
 from app.utils.send_util import send_delete, send_execute
 from utils.sx_time import sxtimeit
@@ -59,12 +55,15 @@ def get_jm_job_infos(token_data: schemas.TokenData = Depends(verify_users), db:
             else:
                 res.update({job_id: [log]})
     for jm_job in jm_job_list:
+        # 历史运行
         history = res[jm_job.id] if jm_job.id in res.keys() else []
         history.sort(key=lambda x: x['start_time'], reverse=True)
         jm_job_dict = jm_job.to_dict()
         history = history[0:10]
         history.sort(key=lambda x: x['start_time'], reverse=False)
         jm_job_dict.update({'history':history})
+        requirements_status = get_requirements_status_by_job_id(db,jm_job.id)
+        jm_job_dict.update({'requirements_status': 0 if requirements_status else 1})
         res_list.append(jm_job_dict)
     return res_list
 
@@ -153,7 +152,6 @@ def api_execute_jm_job(encryption_id: str, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def get_cron_expression(cron_expression: schemas.CronExpression, token_data: schemas.TokenData = Depends(verify_users), ):
-    print(cron_expression)
     cron = joint_cron_expression(cron_expression)
     return cron
 
@@ -162,17 +160,4 @@ def get_cron_expression(cron_expression: schemas.CronExpression, token_data: sch
 @sxtimeit
 def get_cron_next_execute(cron_expression: str, token_data: schemas.TokenData = Depends(verify_users), ):
     execute_list = run_get_next_time(cron_expression)
-    return execute_list
-
-
-def run_get_next_time(cron_expression):
-    SHA_TZ = timezone(timedelta(hours=8),name='Asia/Shanghai',)
-    utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
-    now = utc_now.astimezone(SHA_TZ)
-    cron_str = cron_expression.replace('?','*')
-    cron = croniter.croniter(cron_str, now)
-    execute_list = []
-    for i in range(0, 5):
-        next_time = cron.get_next(datetime).strftime("%Y-%m-%d %H:%M")
-        execute_list.append(next_time)
     return execute_list

+ 8 - 1
app/schemas/dag.py

@@ -2,4 +2,11 @@ from pydantic import BaseModel
 
 class Dag(BaseModel):
     dag_uuid: str
-    dag_script: str
+    dag_script: str
+
+
+class Requirements(BaseModel):
+    # 依赖
+    requirements: str
+    # dag文件uuid
+    dag_uuid: str

+ 11 - 3
app/services/jm_job.py

@@ -20,8 +20,12 @@ type_dict = {
 
 def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Session):
     content = ''
+    envs = {}
     if jm_homework.type == "Dag":
-        content = red_dag_and_format(jm_homework, relation_list, db)
+        content = content = red_dag_and_format(jm_homework, relation_list, db)
+        requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
+        if requirements_relation:
+            envs.update({'requirement_package_path': f'jpt/requirements/dag_{jm_homework.dag_uuid.lower()})'})
     elif jm_homework.type == "Python":
         content = red_python_and_format(jm_homework)
     af_task = {
@@ -30,7 +34,7 @@ def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Sessio
         "script": content if jm_homework.type != "Java" else "",
         "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
         "cmd_parameters": "",
-        "envs": {},
+        "envs": envs,
         "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
         "task_type": type_dict[jm_homework.type],
         "user_id": 0,
@@ -42,8 +46,12 @@ def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Sessio
 def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Session):
     relation = crud.get_af_id(db, jm_homework.id, 'task')
     content = ''
+    envs = {}
     if jm_homework.type == "Dag":
         content = content = red_dag_and_format(jm_homework, relation_list, db)
+        requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
+        if requirements_relation:
+            envs.update({'requirement_package_path': f'jpt/requirements/dag_{jm_homework.dag_uuid.lower()})'})
     elif jm_homework.type == "Python":
         content = red_python_and_format(jm_homework)
     af_task = {
@@ -52,7 +60,7 @@ def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Sessio
         "script": content if jm_homework.type != "Java" else "",
         "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
         "cmd_parameters": "",
-        "envs": {},
+        "envs": envs,
         "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
         "task_type": type_dict[jm_homework.type],
         "user_id": 0,

+ 13 - 0
app/services/jm_job_info.py

@@ -125,6 +125,10 @@ def create_jm_job_node(db: Session, nodes, edges, job_id):
 
 
 def update_jm_job_status_services(db: Session, job_id: int, status: int):
+    if status == 1:
+        requirements_status = get_requirements_status_by_job_id(db,job_id)
+        if not requirements_status:
+            raise Exception('依赖未安装完成,不可开启')
     job_relation = crud.get_af_id(db,job_id,'job')
     on_off_control(job_relation.af_id, status)
     return crud.update_jm_job_status(db,job_id,status)
@@ -134,3 +138,12 @@ def execute_job_services(db: Session, jm_job_id: int):
     res = execute_job(relation.af_id)
     return res
 
+def get_requirements_status_by_job_id(db: Session, job_id: int):
+    nodes = crud.get_one_job_nodes(db, job_id)
+    homeworks = crud.get_jm_homeworks_by_ids(db, [node.homework_id for node in nodes])
+    for homework in homeworks:
+        relation = crud.get_requirements_status(db, homework.dag_uuid) if homework.type == "Dag" else None
+        if relation and relation.status != 2:
+            return False
+    return True
+

+ 52 - 10
app/utils/cron_utils.py

@@ -1,5 +1,14 @@
 import re
 from app import schemas
+from datetime import datetime
+from datetime import timedelta
+from datetime import timezone
+import croniter
+
+from configs.settings import config
+
+hour_min = config.get('CRON_CONFIG', 'hour_min')
+hour_max = config.get('CRON_CONFIG', 'hour_max')
 
 
 def joint_cron_expression(item: schemas.CronExpression):
@@ -8,7 +17,7 @@ def joint_cron_expression(item: schemas.CronExpression):
         if item.minute is not None:
             cron += '0/'+str(item.minute) + ' *'
         elif item.hour is not None:
-            cron += '0 0/'+str(item.hour)
+            cron += '0 4-22/'+str(item.hour)
         else: cron += '*'
         cron += ' * * *'
     elif item.cron_select_type == 1:
@@ -53,28 +62,42 @@ def joint_cron_expression(item: schemas.CronExpression):
         else: cron += ' *'
     else:
         cron = item.cron_expression
-    match_obj = check_cron_expression(cron)
-    if match_obj:
-        return cron
-    return "cron校验未通过"
+    check_cron_expression(cron)
+    check_cron_hour(cron)
+    return cron
 
+# 普通版
+# reg_list = [
+#     "^((([0-9]|[1-5][0-9])(\\,|\\-|\\/){1}([1-9]|[1-5][0-9]))|([0-9]|[0-5][0-9])|(\\*))$",
+#     "^((([0-9]|[1][0-9]|2[0-3])(\\,|\\-|\\/){1}([1-9]|[1][0-9]|2[0-3]))|([0-9]|[1][0-9]|2[0-3])|(\\*))$",
+#     "^((([1-9]|[1-2][0-9]|3[01])(\\,|\\-|\\/){1}([1-9]|[1-2][0-9]|3[01]))|([1-9]|[1-2][0-9]|3[01])|(\\*)|(\\?))$",
+#     "^((([1-9]|[1][0-2])(\\,|\\-|\\/){1}([1-9]|[1][0-2]))|([1-9]|[1][0-2])|(\\*))$",
+#     "^((([1-7])(\\,|\\-|\\/){1}([1-7]))|([1-7])|(\\*)|(\\?))$"
+# ]
+
+# 进阶版
+# reg_list = [
+#     "^((([0-9]|[1-5][0-9])(\\,|\\-){1}([1-9]|[1-5][0-9])(\\/){1}([1-9]|[1-5][0-9]))|(([0-9]|[1-5][0-9])(\\,|\\-|\\/){1}([1-9]|[1-5][0-9]))|([0-9]|[0-5][0-9])|(\\*))$",
+#     "^((([0-9]|[1][0-9]|2[0-3])(\\,|\\-){1}([1-9]|[1][0-9]|2[0-3])(\\/){1}([1-9]|[1][0-9]|2[0-3]))|(([0-9]|[1][0-9]|2[0-3])(\\,|\\-|\\/){1}([1-9]|[1][0-9]|2[0-3]))|([0-9]|[1][0-9]|2[0-3])|(\\*))$",
+#     "^((([1-9]|[1-2][0-9]|3[01])(\\,|\\-){1}([1-9]|[1-2][0-9]|3[01])(\\/){1}([1-9]|[1-2][0-9]|3[01]))|(([1-9]|[1-2][0-9]|3[01])(\\,|\\-|\\/){1}([1-9]|[1-2][0-9]|3[01]))|([1-9]|[1-2][0-9]|3[01])|(\\*)|(\\?))$",
+#     "^((([1-9]|[1][0-2])(\\,|\\-){1}([1-9]|[1][0-2])(\\/){1}([1-9]|[1][0-2]))|(([1-9]|[1][0-2])(\\,|\\-|\\/){1}([1-9]|[1][0-2]))|([1-9]|[1][0-2])|(\\*))$",
+#     "^((([1-7])(\\,|\\-){1}([1-7])(\\/){1}([1-7]))|(([1-7])(\\,|\\-|\\/){1}([1-7]))|([1-7])|(\\*)|(\\?))$"
+# ]
 
 def check_cron_expression(cron_expression):
-    print(cron_expression)
     cron_list = cron_expression.split(' ')
     unit_list = ['minute', 'hour', 'day', 'month', 'week']
     reg_list = [
-        "^((([0-9]|[1-5][0-9])(\\,|\\-|\\/){1}([1-9]|[1-5][0-9]))|([0-9]|[0-5][0-9])|(\\*))$",
-        "^((([0-9]|[1][0-9]|2[0-3])(\\,|\\-|\\/){1}([1-9]|[1][0-9]|2[0-3]))|([0-9]|[1][0-9]|2[0-3])|(\\*))$",
+        "^(([0-9]|[0-5][0-9]))$",
+        "^(([0-9]|[1][0-9]|2[0-3]))$",
         "^((([1-9]|[1-2][0-9]|3[01])(\\,|\\-|\\/){1}([1-9]|[1-2][0-9]|3[01]))|([1-9]|[1-2][0-9]|3[01])|(\\*)|(\\?))$",
         "^((([1-9]|[1][0-2])(\\,|\\-|\\/){1}([1-9]|[1][0-2]))|([1-9]|[1][0-2])|(\\*))$",
         "^((([1-7])(\\,|\\-|\\/){1}([1-7]))|([1-7])|(\\*)|(\\?))$"
-        ]
+    ]
     for cron, unit, reg in zip(cron_list, unit_list, reg_list):
         match_obj = re.match(reg, cron)
         if match_obj is None:
             raise Exception(f'在{unit}的位置上,数据校验错误')
-    return True
 
 def parsing_cron_expression(cron_expression):
     cron_list = cron_expression.split(' ')
@@ -85,3 +108,22 @@ def parsing_cron_expression(cron_expression):
             cron_dict.update({unit: cron.replace('0/','').replace('1/','')})
     return cron_dict
 
+def check_cron_hour(cron_expression):
+    cron_list = cron_expression.split(' ')
+    unit_list = ['minute', 'hour', 'day', 'month', 'week']
+    for cron, unit in zip(cron_list, unit_list):
+        if unit == 'hour':
+            if int(cron) < int(hour_min) or int(cron) > int(hour_max):
+                raise Exception(f'执行时间必须在每日{hour_min}~{hour_max}时之间')
+
+def run_get_next_time(cron_expression):
+    SHA_TZ = timezone(timedelta(hours=8),name='Asia/Shanghai',)
+    utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
+    now = utc_now.astimezone(SHA_TZ)
+    cron_str = cron_expression.replace('?','*')
+    cron = croniter.croniter(cron_str, now)
+    execute_list = []
+    for i in range(0, 5):
+        next_time = cron.get_next(datetime).strftime("%Y-%m-%d %H:%M")
+        execute_list.append(next_time)
+    return execute_list

+ 22 - 1
app/utils/send_util.py

@@ -180,4 +180,25 @@ def get_jupyter_status(data):
         return res.json()
     else:
         msg = result['msg'] if 'msg' in result.keys() else result
-        raise Exception(f'获取jupyter服务状态,请求jupyter端失败-->{msg}')
+        raise Exception(f'获取jupyter服务状态,请求jupyter端失败-->{msg}')
+
+# 请求airflow下载依赖
+def post_install_requirements(requirements_str: str,target_path: str):
+    res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/special_job/install_requirements?requirements_str={requirements_str}&target_path={target_path}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'安装依赖-->请求airflow失败-->{msg}')
+
+
+# 根据job_id和af_run_id获取运行状态
+def get_running_status(job_id: str, af_run_id: str):
+    res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/running_status/{job_id}/{af_run_id}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'获取任务运行状态,请求airflow失败-->{msg}')

+ 7 - 0
data/data.sql

@@ -509,4 +509,11 @@ BEGIN;
 INSERT INTO `users` (`id`, `name`, `code`, `username`, `password`, `create_time`, `user_token`) VALUES (1, 'admin', '42acda425b4f11ed8c18f346385e9a10', 'admin', 'pbkdf2:sha256:150000$LKBvPzSN$8c14ad943aa4ee6863562f5e2194b379036229bd17d5f592c8b812c07a1ece7f', 1669109848, '');
 COMMIT;
 
+-- ----------------------------
+-- Alter for relation
+-- ----------------------------
+ALTER TABLE `relation`
+ADD COLUMN `af_run_id` varchar(100) NULL COMMENT 'airflow运行id' AFTER `dag_uuid`,
+ADD COLUMN `status` tinyint NULL COMMENT '执行状态(0:调度中,1:运行中,2:成功,3:失败)' AFTER `af_run_id`;
+
 SET FOREIGN_KEY_CHECKS = 1;

+ 5 - 1
development.ini

@@ -92,4 +92,8 @@ tag = sxkj
 host = aihub-dag.sxkj.com
 chart = aihub-dag-jupyter.tgz
 path_type = ImplementationSpecific
-ingress_class = ''
+ingress_class = ''
+
+[CRON_CONFIG]
+hour_min = 4
+hour_max = 22

+ 5 - 1
idctest.ini

@@ -112,4 +112,8 @@ tag = idctest
 host = aihub-dag-idctest.digitalyili.com
 chart = aihub-dag-jupyter.tgz
 path_type = ImplementationSpecific
-ingress_class = public
+ingress_class = public
+
+[CRON_CONFIG]
+hour_min = 4
+hour_max = 22

+ 4 - 0
production.ini

@@ -103,3 +103,7 @@ tag = test
 host = ailab-test.digitalyili.com
 chart = aihub-dag-jupyter.tgz
 path_type = ImplementationSpecific
+
+[CRON_CONFIG]
+hour_min = 4
+hour_max = 22

+ 4 - 0
sxkj.ini

@@ -95,3 +95,7 @@ host = aihub-dag.sxkj.com
 chart = aihub-dag-jupyter.tgz
 path_type = ImplementationSpecific
 ingress_class =
+
+[CRON_CONFIG]
+hour_min = 4
+hour_max = 22

+ 4 - 0
txprod.ini

@@ -110,3 +110,7 @@ host = ailab.digitalyili.com
 chart = aihub-dag-jupyter.tgz
 path_type = ImplementationSpecific
 ingress_class = prod-pri-ingress
+
+[CRON_CONFIG]
+hour_min = 4
+hour_max = 22

+ 4 - 0
txtest.ini

@@ -110,3 +110,7 @@ host = ailab-test.digitalyili.com
 chart = aihub-dag-jupyter.tgz
 path_type = ImplementationSpecific
 ingress_class = test-pri-ingress
+
+[CRON_CONFIG]
+hour_min = 4
+hour_max = 22