Parcourir la source

拼接判断修改

liweiquan il y a 2 ans
Parent
commit
a75cbb1451

+ 2 - 0
app/crud/jm_homework.py

@@ -5,6 +5,7 @@ from sqlalchemy.orm import Session
 from app.crud.constant import find_and_update
 
 from app.crud.jm_homework_datasource_relation import create_jm_hd_relation, delete_jm_relations, get_jm_relations
+from app.services.jm_job import jm_job_create_task
 
 
 def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
@@ -24,6 +25,7 @@ def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
         'update_time': create_time,
         'status': 1
     })
+    jm_job_create_task(db_item, db)
     db.add(db_item)
     db.commit()
     db.refresh(db_item)

+ 1 - 0
app/routers/dag.py

@@ -52,3 +52,4 @@ def get_file_byte(filename, chunk_size=1024):
                 yield content
             else:
                 break
+

+ 3 - 1
app/routers/jm_homework.py

@@ -7,6 +7,7 @@ from sqlalchemy.orm import Session
 from app import schemas
 
 import app.crud as crud
+from app.crud import jm_homework
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
@@ -25,7 +26,8 @@ router = APIRouter(
 @sxtimeit
 def create_jm_homework(item: schemas.JmHomeworkCreate, db: Session = Depends(get_db)):
     # 根据获取到的文件路径另存一份并改变
-    return crud.create_jm_homework(db, item)
+    jm_homework = crud.create_jm_homework(db, item)
+    return jm_homework
 
 @router.get("/")
 @web_try()

+ 0 - 2
app/services/datax.py

@@ -1,5 +1,3 @@
-
-
 from app import crud, models
 from app.utils.send_util import *
 from app.utils.utils import get_cmd_parameter

+ 45 - 0
app/services/jm_job.py

@@ -0,0 +1,45 @@
+from app import crud, models
+from app.utils.send_util import *
+from app.utils.utils import get_cmd_parameter
+from sqlalchemy.orm import Session
+
+type_dict = {
+    "Java": "java",
+    "Python": "python",
+    "Dag": "sparks"
+}
+
+def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
+    af_task = {
+        "name": jm_homework.name,
+        "file_urls": [jm_homework.dag_url] if jm_homework.type == "Dag" else [jm_homework.script_file],
+        "script": "",
+        "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
+        "cmd_parameters": "",
+        "envs": {},
+        "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
+        "task_type": type_dict[jm_homework.type],
+        "user_id": 0,
+    }
+    res = send_post('/jpt/af_task', af_task)
+    af_task = res['data']
+    crud.create_relation(db ,jm_homework.id, jm_homework.type, af_task['id'])
+    return af_task
+
+def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
+    relation = crud.get_af_id(db, jm_homework.id, jm_homework.type)
+    af_task = {
+        "name": jm_homework.name,
+        "file_urls": [jm_homework.dag_url] if jm_homework.type == "Dag" else [jm_homework.script_file],
+        "script": "",
+        "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
+        "cmd_parameters": "",
+        "envs": {},
+        "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
+        "task_type": type_dict[jm_homework.type],
+        "user_id": 0,
+    }
+    res = send_put('/jpt/af_task', relation.af_id, af_task)
+    af_task = res['data']
+
+    return af_task

+ 3 - 3
app/utils/utils.py

@@ -33,11 +33,11 @@ def byte_conversion(size):
 def get_cmd_parameter(jvm_param, inc_start_time, replace_param, partition_info):
     cmd_parameter = ''
     current_time = int(time.time())
-    if jvm_param is not None:
+    if jvm_param is not None and jvm_param != '':
         cmd_parameter += '-j "' + jvm_param + '" '
-    if replace_param is not None:
+    if replace_param is not None and replace_param != '':
         cmd_parameter += '-p "' + (replace_param % (inc_start_time,current_time))
-    if partition_info is not None:
+    if partition_info is not None and partition_info != '':
         partition_list = partition_info.split(',')
         partition_time = current_time - 86400*int(partition_list[1])
         partition_time_format = partition_list[2].replace('yyyy','%Y').replace('MM','%m').replace('dd','%d')