Parcourir la source

调试任务增加依赖

liweiquan il y a 2 ans
Parent
commit
1e31483215
2 fichiers modifiés avec 20 ajouts et 6 suppressions
  1. 19 5
      app/services/dag.py
  2. 1 1
      app/services/jm_job.py

+ 19 - 5
app/services/dag.py

@@ -10,7 +10,7 @@ from configs.settings import DefaultOption, config
 database_name = config.get('HIVE', 'DATABASE_NAME')
 
 def dag_create_job(dag_uuid:str,dag_script: str,db: Session):
-    af_task = dag_create_task(dag_script)
+    af_task = dag_create_task(dag_uuid,dag_script,db)
     af_job = {
         "tasks": [af_task],
         "name": "调试任务",
@@ -31,14 +31,21 @@ def dag_create_job(dag_uuid:str,dag_script: str,db: Session):
     crud.create_debug_relation(db,dag_uuid,'debug',af_job['id'])
     return af_job
 
-def dag_create_task(dag_script: str):
+def dag_create_task(dag_uuid:str,dag_script: str,db: Session):
+    envs = {}
+    requirements_relation = crud.get_requirements_relation(db, dag_uuid)
+    if requirements_relation:
+        requirements = crud.get_requirements_status(db, dag_uuid)
+        if requirements.status != 2:
+            raise Exception('依赖未安装成功,不可执行')
+        envs.update({'requirement_package_path': f'jpt/requirements/dag_{dag_uuid.lower()})'})
     af_task = {
         "name": "调试作业",
         "file_urls": [],
         "script": dag_script,
         "cmd": "",
         "cmd_parameters": "",
-        "envs": {},
+        "envs": envs,
         "run_image": "",
         "task_type": "sparks",
         "user_id": 0,
@@ -71,14 +78,21 @@ def dag_update_job(dag_uuid:str,dag_script: str, db: Session):
     return af_job
 
 
-def dag_put_task(dag_script: str,old_af_task):
+def dag_put_task(dag_uuid: str, dag_script: str, db: Session, old_af_task):
+    envs = {}
+    requirements_relation = crud.get_requirements_relation(db, dag_uuid)
+    if requirements_relation:
+        requirements = crud.get_requirements_status(db, dag_uuid)
+        if requirements.status != 2:
+            raise Exception('依赖未安装成功,不可执行')
+        envs.update({'requirement_package_path': f'jpt/requirements/dag_{dag_uuid.lower()})'})
     af_task = {
         "name": "调试作业",
         "file_urls": [],
         "script": dag_script,
         "cmd": "",
         "cmd_parameters": "",
-        "envs": {},
+        "envs": envs,
         "run_image": "",
         "task_type": "sparks",
     }

+ 1 - 1
app/services/jm_job.py

@@ -49,7 +49,7 @@ def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Sessio
     content = ''
     envs = {}
     if jm_homework.type == "Dag":
-        content = content = red_dag_and_format(jm_homework, relation_list, db)
+        content = red_dag_and_format(jm_homework, relation_list, db)
         requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
         if requirements_relation:
             envs.update({'requirement_package_path': f'jpt/requirements/dag_{jm_homework.dag_uuid.lower()})'})