liweiquan преди 2 години
родител
ревизия
0073c19959
променени са 3 файла, в които са добавени 48 реда и са изтрити 10 реда
  1. 29 4
      app/routers/jm_homework.py
  2. 1 2
      app/services/datax.py
  3. 18 4
      app/services/jm_job.py

+ 29 - 4
app/routers/jm_homework.py

@@ -5,15 +5,16 @@ from fastapi import APIRouter
 from fastapi import Depends
 from sqlalchemy.orm import Session
 from app import schemas
-
-import app.crud as crud
+from app.common.hive import hiveDs
 from app.crud import jm_homework
 from app.services.jm_job import red_dag_and_format
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
-
+import app.crud as crud
 from app import get_db
+from configs.settings import DefaultOption, config
+DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
 
 
 
@@ -65,4 +66,28 @@ def delete_jm_homework(jm_id: int, db: Session = Depends(get_db)):
 def get_test_dag(db: Session = Depends(get_db)):
     jm_homework = crud.get_jm_homework_info(db, 83)
     res = red_dag_and_format(jm_homework, db)
-    return res
+    return res
+
+@router.get("/local_source")
+@web_try()
+@sxtimeit
+def get_local_source():
+    return [{
+            'database_name': DATABASE_NAME,
+            'datasource': "hive",
+            'datasource_name': DATABASE_NAME,
+            'id': -1
+    }]
+
+@router.get("/local_source_table")
+@web_try()
+@sxtimeit
+def get_local_source_table():
+    t_list = hiveDs.list_tables()
+    return t_list
+
+@router.get("/local_source_table_schema")
+@web_try()
+@sxtimeit
+def get_local_source_table_schema(table_name: str, db: Session = Depends(get_db)):
+    return hiveDs.get_table_schema(table_name)

+ 1 - 2
app/services/datax.py

@@ -27,8 +27,7 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
     af_job = res['data']
     crud.create_relation(db, job_info.id,'datax', af_job['id'])
     send_submit(af_job['id'])
-    get_job_last_parsed_time()
-    # on_off_control(af_job['id'], job_info.trigger_status)
+    on_off_control(af_job['id'], job_info.trigger_status)
 
 def datax_create_task(job_info: models.JobInfo):
     cmd_parameter = get_cmd_parameter(job_info.jvm_param)

+ 18 - 4
app/services/jm_job.py

@@ -9,6 +9,8 @@ from app.crud.jm_homework_datasource_relation import get_jm_relations
 from app.utils.send_util import *
 from sqlalchemy.orm import Session
 from app.common.minio import minio_client
+from configs.settings import DefaultOption, config
+DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
 
 type_dict = {
     "Java": "java",
@@ -159,8 +161,14 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             for filed in fileds:
                 script += filed['dataField'] + ','
             script = script.strip(',')
-            data_source = crud.get_job_jdbc_datasource(db,node_relation_dict[node['id']].datasource_id)
-            script += ' from ' + data_source.database_name + '.'+node_relation_dict[node['id']].table+''
+            ds_id = node_relation_dict[node['id']].datasource_id
+            database_name = ""
+            if ds_id  == -1:
+                database_name = DATABASE_NAME
+            else:
+                data_source = crud.get_job_jdbc_datasource(db,ds_id)
+                database_name = data_source.database_name
+            script += ' from ' + database_name + '.'+node_relation_dict[node['id']].table
             sub_node = {
                 "id": node['id'],
                 "name": node['name'],
@@ -170,9 +178,15 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             sub_nodes.append(sub_node)
         elif node['op'] == 'outputsource':
             fileds = node['data']['output_source']
-            data_source = crud.get_job_jdbc_datasource(db,node_relation_dict[node['id']].datasource_id)
+            ds_id = node_relation_dict[node['id']].datasource_id
+            database_name = ""
+            if ds_id  == -1:
+                database_name = DATABASE_NAME
+            else:
+                data_source = crud.get_job_jdbc_datasource(db,ds_id)
+                database_name = data_source.database_name
             script = '''def main_func (input0, spark,sc):
-    input0.write.mode("overwrite").saveAsTable("''' + data_source.database_name + '.'+node_relation_dict[node['id']].table+'''")'''
+    input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']].table+'''")'''
             inputs = {}
             index = 0
             input_list = t_s[node['id']] if node['id'] in t_s.keys() else []