Bladeren bron

Merge branch 'integration' of http://gogsb.soaringnova.com/sxwl_DL/datax-admin into integration

Zhang Li 2 jaren geleden
bovenliggende
commit
cbf4dd95e0
10 gewijzigde bestanden met toevoegingen van 50 en 15 verwijderingen
  1. 2 1
      app/routers/jm_job_log.py
  2. 20 5
      app/services/dag.py
  3. 5 4
      app/services/jm_job.py
  4. 1 1
      app/utils/cron_utils.py
  5. 4 1
      development.ini
  6. 4 1
      idctest.ini
  7. 3 0
      production.ini
  8. 3 0
      sxkj.ini
  9. 4 1
      txprod.ini
  10. 4 1
      txtest.ini

+ 2 - 1
app/routers/jm_job_log.py

@@ -127,5 +127,6 @@ def get_job_log_status(ids: str, token_data: schemas.TokenData = Depends(verify_
     id_to_status = {}
     for run_id in run_ids:
         res = get_job_run_status(run_id)
-        id_to_status.update({run_id:RUN_STATUS[res['data']['status']]})
+        # id_to_status.update({run_id:RUN_STATUS[res['data']['status']]})
+        id_to_status.update({run_id:res['data']['status']})
     return id_to_status

+ 20 - 5
app/services/dag.py

@@ -8,9 +8,10 @@ from app.common.hive import hiveDs
 from configs.settings import DefaultOption, config
 
 database_name = config.get('HIVE', 'DATABASE_NAME')
+requirement_path = config.get('REQUIREMENTS_CONFIG', 'path')
 
 def dag_create_job(dag_uuid:str,dag_script: str,db: Session):
-    af_task = dag_create_task(dag_script)
+    af_task = dag_create_task(dag_uuid, dag_script, db)
     af_job = {
         "tasks": [af_task],
         "name": "调试任务",
@@ -31,14 +32,21 @@ def dag_create_job(dag_uuid:str,dag_script: str,db: Session):
     crud.create_debug_relation(db,dag_uuid,'debug',af_job['id'])
     return af_job
 
-def dag_create_task(dag_script: str):
+def dag_create_task(dag_uuid:str,dag_script: str,db: Session):
+    envs = {}
+    requirements_relation = crud.get_requirements_relation(db, dag_uuid)
+    if requirements_relation:
+        requirements = crud.get_requirements_status(db, dag_uuid)
+        if requirements.status != 2:
+            raise Exception('依赖未安装成功,不可执行')
+        envs.update({'requirement_package_path': f'{requirement_path}dag_{dag_uuid.lower()})'})
     af_task = {
         "name": "调试作业",
         "file_urls": [],
         "script": dag_script,
         "cmd": "",
         "cmd_parameters": "",
-        "envs": {},
+        "envs": envs,
         "run_image": "",
         "task_type": "sparks",
         "user_id": 0,
@@ -71,14 +79,21 @@ def dag_update_job(dag_uuid:str,dag_script: str, db: Session):
     return af_job
 
 
-def dag_put_task(dag_script: str,old_af_task):
+def dag_put_task(dag_uuid:str,dag_script: str,db: Session,old_af_task):
+    envs = {}
+    requirements_relation = crud.get_requirements_relation(db, dag_uuid)
+    if requirements_relation:
+        requirements = crud.get_requirements_status(db, dag_uuid)
+        if requirements.status != 2:
+            raise Exception('依赖未安装成功,不可执行')
+        envs.update({'requirement_package_path': f'{requirement_path}dag_{dag_uuid.lower()})'})
     af_task = {
         "name": "调试作业",
         "file_urls": [],
         "script": dag_script,
         "cmd": "",
         "cmd_parameters": "",
-        "envs": {},
+        "envs": envs,
         "run_image": "",
         "task_type": "sparks",
     }

+ 5 - 4
app/services/jm_job.py

@@ -11,6 +11,7 @@ from sqlalchemy.orm import Session
 from app.common.minio import minio_client
 from configs.settings import DefaultOption, config
 DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
+requirement_path = config.get('REQUIREMENTS_CONFIG', 'path')
 
 type_dict = {
     "Java": "java",
@@ -22,10 +23,10 @@ def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Sessio
     content = ''
     envs = {}
     if jm_homework.type == "Dag":
-        content = content = red_dag_and_format(jm_homework, relation_list, db)
+        content = red_dag_and_format(jm_homework, relation_list, db)
         requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
         if requirements_relation:
-            envs.update({'requirement_package_path': f'jpt/requirements/dag_{jm_homework.dag_uuid.lower()})'})
+            envs.update({'requirement_package_path': f'{requirement_path}dag_{jm_homework.dag_uuid.lower()})'})
     elif jm_homework.type == "Python":
         content = red_python_and_format(jm_homework)
     af_task = {
@@ -48,10 +49,10 @@ def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Sessio
     content = ''
     envs = {}
     if jm_homework.type == "Dag":
-        content = content = red_dag_and_format(jm_homework, relation_list, db)
+        content = red_dag_and_format(jm_homework, relation_list, db)
         requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
         if requirements_relation:
-            envs.update({'requirement_package_path': f'jpt/requirements/dag_{jm_homework.dag_uuid.lower()})'})
+            envs.update({'requirement_package_path': f'{requirement_path}dag_{jm_homework.dag_uuid.lower()})'})
     elif jm_homework.type == "Python":
         content = red_python_and_format(jm_homework)
     af_task = {

+ 1 - 1
app/utils/cron_utils.py

@@ -113,7 +113,7 @@ def check_cron_hour(cron_expression):
     unit_list = ['minute', 'hour', 'day', 'month', 'week']
     for cron, unit in zip(cron_list, unit_list):
         if unit == 'hour':
-            if int(cron) < int(hour_min) or int(cron) > int(hour_max):
+            if int(cron) < int(hour_min) or int(cron) > int(hour_max)-1:
                 raise Exception(f'执行时间必须在每日{hour_min}~{hour_max}时之间')
 
 def run_get_next_time(cron_expression):

+ 4 - 1
development.ini

@@ -96,4 +96,7 @@ ingress_class = ''
 
 [CRON_CONFIG]
 hour_min = 4
-hour_max = 22
+hour_max = 22
+
+[REQUIREMENTS_CONFIG]
+path = jpt/requirements/

+ 4 - 1
idctest.ini

@@ -116,4 +116,7 @@ ingress_class = public
 
 [CRON_CONFIG]
 hour_min = 4
-hour_max = 22
+hour_max = 22
+
+[REQUIREMENTS_CONFIG]
+path = jpt/requirements/

+ 3 - 0
production.ini

@@ -107,3 +107,6 @@ path_type = ImplementationSpecific
 [CRON_CONFIG]
 hour_min = 4
 hour_max = 22
+
+[REQUIREMENTS_CONFIG]
+path = jpt/requirements/

+ 3 - 0
sxkj.ini

@@ -99,3 +99,6 @@ ingress_class =
 [CRON_CONFIG]
 hour_min = 4
 hour_max = 22
+
+[REQUIREMENTS_CONFIG]
+path = jpt/requirements/

+ 4 - 1
txprod.ini

@@ -113,4 +113,7 @@ ingress_class = prod-pri-ingress
 
 [CRON_CONFIG]
 hour_min = 4
-hour_max = 22
+hour_max = 22
+
+[REQUIREMENTS_CONFIG]
+path = jpt/requirements/

+ 4 - 1
txtest.ini

@@ -113,4 +113,7 @@ ingress_class = test-pri-ingress
 
 [CRON_CONFIG]
 hour_min = 4
-hour_max = 22
+hour_max = 22
+
+[REQUIREMENTS_CONFIG]
+path = jpt/requirements/