Bläddra i källkod

Merge remote-tracking branch 'origin/master'

luoyulong 2 år sedan
förälder
incheckning
7c4d3390f8

+ 1 - 0
app/core/datasource/datasource.py

@@ -1,5 +1,6 @@
 
 from dataclasses import dataclass
+from typing import Optional
 from app.utils import *
 
 

+ 20 - 7
app/core/datasource/mysql.py

@@ -9,13 +9,18 @@ from configs import logger
 from utils import flat_map
 
 
-@dataclass
 class MysqlDS(DataSourceBase):
-    type = 'mysql'
+
+    def __init__(self,  host: str, port: int, username: str, password: str, database_name: str, use_ssl: int, type: str='mysql'):
+        DataSourceBase.__init__(self, type, host, port, username, password, database_name, )
+        self.use_ssl = use_ssl
 
     @property
     def jdbc_url(self):
-        return f'jdbc:mysql://{self.host}:{self.port}/{self.database_name}'
+        jdbc =  f'jdbc:mysql://{self.host}:{self.port}/{self.database_name}'
+        if self.use_ssl == 0:
+            jdbc = f'{jdbc}?useSSL=false'
+        return jdbc
 
     @property
     def jdbc_driver_class(self):
@@ -29,11 +34,13 @@ class MysqlDS(DataSourceBase):
         # 判断mysql是否连接成功
         conn = None
         try:
+            use_ssl = False if self.use_ssl == 0 else True
             conn  = connector.connect(host=self.host,
                                     port=self.port,
                                     database=self.database_name,
                                     user=self.username,
-                                    password=self.password)
+                                    password=self.password,
+                                    ssl_disabled=not use_ssl)
             if conn.is_connected():
                 logger.info('Connected to MySQL database')
 
@@ -51,11 +58,13 @@ class MysqlDS(DataSourceBase):
         conn = None
         res = []
         try:
+            use_ssl = False if self.use_ssl == 0 else True
             conn  = connector.connect(host=self.host,
                                     port=self.port,
                                     database=self.database_name,
                                     user=self.username,
-                                    password=self.password)
+                                    password=self.password,
+                                      ssl_disabled=not use_ssl)
             cursor = conn.cursor()
             for sql in sqls:
                 cursor.execute(sql)
@@ -87,11 +96,15 @@ class MysqlDS(DataSourceBase):
 
 
     def list_tables(self):
-        sql = f'SELECT table_name FROM information_schema.tables WHERE table_type = "base table" AND table_schema="{self.database_name}"'
+        # table_type = "base table" AND
+        sql = f'SELECT table_name FROM information_schema.tables WHERE table_schema="{self.database_name}"'
         res = self._execute_sql([sql])
         return flat_map(lambda x: x, res[0])
 
     def get_table_schema(self, table_name):
+        def handle_col(x):
+            line = list(map(lambda s:  s.decode('utf-8') if type(s) == type(b'bytes') else  str(s), x))
+            return [':'.join(line[:3])]
         sql = f'describe `{self.database_name}`.{table_name}'
         logger.info(sql)
         res = self._execute_sql([sql])
@@ -99,7 +112,7 @@ class MysqlDS(DataSourceBase):
             res = [[str(i) , *x]for i, x in enumerate(res[0])]
             logger.info(res)
 
-            return flat_map(lambda x: [':'.join(x[:3])], res)
+            return flat_map(lambda x: handle_col(x), res)
         else:
             raise Exception('table not found')
 

+ 32 - 9
app/core/datax/hdfs.py

@@ -4,7 +4,8 @@ from app.models import JobJdbcDatasource
 from app.schemas.datax_json import DataXJsonParam
 from app.utils import *
 
-
+from configs.settings import config
+import json
 
 {
     "writer": {
@@ -61,6 +62,12 @@ from app.utils import *
     }
 }
 
+
+def _build_hadoop_configs():
+    hadoop = config.get('HADOOP_INNER', 'hadoop_config')
+    kerberos = config.get('HADOOP_INNER', 'kerberos_config')
+    return json.loads(hadoop), json.loads(kerberos)
+
 class HdfsReader(WriterBase):
     def __init__(self, ds: JobJdbcDatasource):
         WriterBase.__init__(self, ds)
@@ -82,17 +89,25 @@ class HdfsReader(WriterBase):
         return res
 
     def _convert_type(self, type):
-        if type.lower() == 'int':
+        if 'int' in type.lower().strip():
             return 'long'
-        elif type.lower() == 'varchar':
+        elif 'varchar' in type.lower().strip():
             return 'string'
+        return 'string'
 
     def build_parameter(self, param: DataXJsonParam, is_show=True):
         parameter = dict()
-        parameter['path'] = param.hive_reader.reader_path
-        parameter['defaultFS'] = param.hive_reader.reader_default_fs
+        # 需要kerberos验证的hive
+        if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
+            parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
+            hadoop, kerberos = _build_hadoop_configs()
+            parameter['hadoopConfig'] = hadoop
+            parameter.update(kerberos)
+        else:
+            parameter['defaultFS'] = param.hive_reader.reader_default_fs
+        parameter['path'] = param.hive_reader.reader_path.strip()
         parameter['fileType'] = param.hive_reader.reader_file_type
-        parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter
+        parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter.strip()
         parameter['column'] = self._build_column(param.reader_columns)
         return parameter
 
@@ -128,14 +143,22 @@ class HdfsWriter(WriterBase):
         return res
 
 
+
     def build_parameter(self, param: DataXJsonParam, is_show=True):
         parameter = dict()
-        parameter['defaultFS'] = param.hive_writer.writer_default_fs
+        # 需要kerberos验证的hive
+        if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
+            parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
+            hadoop, kerberos = _build_hadoop_configs()
+            parameter['hadoopConfig'] = hadoop
+            parameter.update(kerberos)
+        else:
+            parameter['defaultFS'] = param.hive_writer.writer_default_fs
         parameter['fileType'] = param.hive_writer.writer_file_type
-        parameter['path'] = param.hive_writer.writer_path
+        parameter['path'] = param.hive_writer.writer_path.strip()
         parameter['fileName'] = param.hive_writer.writer_filename
         parameter['writeMode'] = param.hive_writer.writer_mode
-        parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter
+        parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter.strip()
         parameter['column'] = self._build_column(param.writer_columns)
         return parameter
 

+ 15 - 52
app/crud/jm_homework.py

@@ -4,35 +4,14 @@ from app import models, schemas
 from sqlalchemy.orm import Session
 from app.crud.constant import find_and_update
 
-from app.crud.jm_homework_datasource_relation import create_jm_hd_relation, delete_jm_relations, get_jm_relations
-from app.services.jm_job import jm_homework_submit
+from app.crud.jm_homework_datasource_relation import get_jm_relations
 
 
-def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
-    jm_homework_create = item.dict()
-    db_item = db.query(models.JmHomework).filter(models.JmHomework.name == jm_homework_create['name'])\
-        .filter(models.JmHomework.status != 0).first()
-    if db_item:
-        raise Exception('作业名称已存在')
-    relation_list = []
-    if 'relation_list' in jm_homework_create.keys():
-        relation_list = jm_homework_create.pop('relation_list')
-    tag = jm_homework_create['tag']
-    find_and_update(db, '作业标签', tag)
-    create_time: int = int(time.time())
-    db_item = models.JmHomework(**jm_homework_create,**{
-        'create_time': create_time,
-        'update_time': create_time,
-        'status': 1
-    })
-    db.add(db_item)
+def create_jm_homework(db: Session, item: models.JmHomework):
+    db.add(item)
     db.commit()
-    db.refresh(db_item)
-    if jm_homework_create['type'] == 'Dag' and relation_list is not None:
-        for relation in relation_list:
-            create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
-    jm_homework_submit(db_item, db)
-    return db_item.to_dict()
+    db.refresh(item)
+    return item
 
 def get_jm_homeworks(db: Session, project_id: str):
     res: List[models.JmHomework] = db.query(models.JmHomework)\
@@ -49,34 +28,18 @@ def get_jm_homework_info(db: Session, homework_id: int):
         item.__dict__.update({"hd_relation":relations})
     return item
 
-def update_jm_homework(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
-    jm_homework_update =update_item.dict(exclude_unset=True)
-    db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
-    if not db_item:
-        raise Exception('未找到该作业')
-    db_name_item = db.query(models.JmHomework)\
-        .filter(models.JmHomework.name == jm_homework_update['name'])\
-        .filter(models.JmHomework.status != 0)\
-        .filter(models.JmHomework.id != id).first()
-    if db_name_item:
-        raise Exception('作业名称已存在')
-    relation_list = []
-    if 'relation_list' in jm_homework_update.keys():
-        relation_list = jm_homework_update.pop('relation_list')
-    tag = jm_homework_update['tag']
-    find_and_update(db, '作业标签', tag)
-    for k, v in jm_homework_update.items():
-        setattr(db_item, k, v)
-    db_item.update_time = int(time.time())
+
+def get_jm_homework_by_name(db: Session, name: str):
+    db_item = db.query(models.JmHomework).filter(models.JmHomework.name == name)\
+        .filter(models.JmHomework.status != 0).first()
+    return db_item
+
+
+def update_jm_homework(db: Session, id: int, update_item: models.JmHomework):
     db.commit()
     db.flush()
-    db.refresh(db_item)
-    delete_jm_relations(db,db_item.id)
-    if jm_homework_update['type'] == 'Dag' and relation_list is not None:
-        for relation in relation_list:
-            create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
-    jm_homework_submit(db_item, db)
-    return db_item.to_dict()
+    db.refresh(update_item)
+    return update_item
 
 def delete_jm_homework(db: Session, id: int):
     db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()

+ 8 - 61
app/crud/jm_job_info.py

@@ -3,37 +3,11 @@ from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
 
-from app.crud.constant import find_and_update
-from app.utils.cron_utils import *
-from app.services.jm_job import jm_job_submit
-
-def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
-    jm_job_info_create = item.dict()
-    cron_expression_item = jm_job_info_create.pop('cron_expression', None)
-    if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
-        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
-        cron_select_type = cron_expression_item["cron_select_type"]
-        jm_job_info_create.update({
-            'cron_select_type': cron_select_type,
-            'cron_expression': cron_expression,
-        })
-    nodes = jm_job_info_create.pop('nodes', None)
-    edges = jm_job_info_create.pop('edges', None)
-    db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
-        .filter(models.JmJobInfo.delete_status != 0).first()
-    if db_item:
-        raise Exception('定时任务名称已存在')
-    tag = jm_job_info_create['tag']
-    find_and_update(db, '任务标签', tag)
-    jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
-        'status': 0,
-        'delete_status': 1,
-    })
+def create_jm_job_info(db: Session, jm_job_info: models.JmJobInfo):
     db.add(jm_job_info)
     db.commit()
     db.refresh(jm_job_info)
-    jm_job_submit(jm_job_info, db)
-    return jm_job_info,nodes,edges
+    return jm_job_info
 
 def get_jm_job_infos(db: Session):
     res: List[models.JmJobInfo] = db.query(models.JmJobInfo)\
@@ -49,37 +23,11 @@ def get_jm_job_info(db: Session, jm_job_id: int):
         raise Exception('未找到该定时任务')
     return item
 
-def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
-    jm_job_info_update = item.dict(exclude_unset=True)
-    cron_expression_item = jm_job_info_update.pop('cron_expression', None)
-    if jm_job_info_update['cron_type'] == 2:
-        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
-        cron_select_type = cron_expression_item["cron_select_type"]
-        jm_job_info_update.update({
-            'cron_select_type': cron_select_type,
-            'cron_expression': cron_expression,
-        })
-    nodes = jm_job_info_update.pop('nodes', None)
-    edges = jm_job_info_update.pop('edges', None)
-    db_item = db.query(models.JmJobInfo)\
-        .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
-    if not db_item:
-        raise Exception('未找到该定时任务')
-    db_name_item = db.query(models.JmJobInfo)\
-        .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
-        .filter(models.JmJobInfo.delete_status != 0)\
-        .filter(models.JmJobInfo.id != item.id).first()
-    if db_name_item:
-        raise Exception('定时任务名称已存在')
-    tag = jm_job_info_update['tag']
-    find_and_update(db, '任务标签', tag)
-    for k, v in jm_job_info_update.items():
-        setattr(db_item, k, v)
+def update_jm_job_info(db: Session, item: models.JmJobInfo):
     db.commit()
     db.flush()
-    db.refresh(db_item)
-    jm_job_submit(db_item,db)
-    return db_item,nodes,edges
+    db.refresh(item)
+    return item
 
 def delete_jm_job_info(db: Session, jm_job_id: int):
     jm_job_info = db.query(models.JmJobInfo)\
@@ -94,14 +42,13 @@ def delete_jm_job_info(db: Session, jm_job_id: int):
     db.refresh(jm_job_info)
     return jm_job_info
 
-def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
+def update_jm_job_status(db: Session, jm_job_id: int, status: int):
     jm_job_info = db.query(models.JmJobInfo)\
-        .filter(models.JmJobInfo.id == item.id)\
+        .filter(models.JmJobInfo.id == jm_job_id)\
         .filter(models.JmJobInfo.delete_status != 0).first()
     if not jm_job_info:
         raise Exception('未找到该定时任务')
-    jm_job_info.status = item.status
-    jm_job_submit(jm_job_info,db)
+    jm_job_info.status = status
     db.commit()
     db.flush()
     db.refresh(jm_job_info)

+ 8 - 63
app/crud/job_info.py

@@ -2,80 +2,25 @@ import time
 from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
-
+from app.services.datax import datax_create_job
 from app.utils.cron_utils import *
 
 
-def create_job_info(db: Session, item: schemas.JobInfoCreate):
-    create_time: int = int(time.time())
-    item_dict = item.dict()
-    cron_expression_dict = item_dict.pop('cron_expression')
-    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
-    cron_select_type = cron_expression_dict["cron_select_type"]
-    item_dict.update({
-        'cron_select_type': cron_select_type,
-        'job_cron': cron_expression,
-    })
-    partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() and item_dict['partition_info'] != '' else None
-    partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() and item_dict['partition_time'] != '' else None
-    partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() and item_dict['partition_num'] != '' else None
-    partition_info_str = ''
-    if partition_info is not None and partition_time is not None and partition_num is not None:
-        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
-    elif partition_info is not None and (partition_time is None or partition_num is None):
-        raise Exception('分区信息不完善')
-    item_dict.update({
-        'partition_info': partition_info_str,
-    })
-    db_item = models.JobInfo(**item_dict, **{
-        'trigger_status': 0,
-        'create_time': create_time,
-        'update_time': create_time,
-        'delete_status': 1,
-    })
-    db.add(db_item)
+def create_job_info(db: Session, item: models.JobInfo):
+    db.add(item)
     db.commit()
-    db.refresh(db_item)
-    return db_item
-
-
+    db.refresh(item)
+    return item
 
 def get_job_infos(db: Session):
     res: List[models.JobInfo] = db.query(models.JobInfo).filter(models.JobInfo.delete_status == 1).all()  # TODO: 排序
     return res
 
-
-
-def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate):
-    db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
-    if not db_item:
-        raise Exception('未找到该任务')
-    update_dict = update_item.dict(exclude_unset=True)
-    cron_expression_dict = update_dict.pop('cron_expression')
-    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
-    cron_select_type = cron_expression_dict["cron_select_type"]
-    update_dict.update({
-        'cron_select_type': cron_select_type,
-        'job_cron': cron_expression,
-    })
-    partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() and update_dict['partition_info'] != '' else None
-    partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys()  and update_dict['partition_time'] != '' else None
-    partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys()  and update_dict['partition_num'] != '' else None
-    partition_info_str = ''
-    if partition_info is not None and partition_time is not None and partition_num is not None:
-        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
-    elif partition_info is not None and (partition_time is None or partition_num is None):
-        raise Exception('分区信息不完善')
-    update_dict.update({
-        'partition_info': partition_info_str,
-    })
-    for k, v in update_dict.items():
-        setattr(db_item, k, v)
-    db_item.update_time = int(time.time())
+def update_job_info(db: Session, id: int, update_item: models.JobInfo):
     db.commit()
     db.flush()
-    db.refresh(db_item)
-    return db_item
+    db.refresh(update_item)
+    return update_item
 
 def update_job_trigger_status(db: Session, id: int, trigger_status: int):
     db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()

+ 4 - 2
app/crud/job_jdbc_datasource.py

@@ -8,8 +8,9 @@ from app.utils import decode_user
 
 
 def _decode(url, datasource, database_name):
-    return url.replace('jdbc:', '').replace('hive2://', '').replace(f'{datasource}://', '').replace(f'/{database_name}',
-                                                                                                    '')
+    url =  url.replace('jdbc:', '').replace('hive2://', '').replace(f'{datasource}://', '').replace(f'/{database_name}','')
+    return url.split('?')[0]
+
 
 
 def _format_datasource(db: Session, item: schemas.JobJdbcDatasourceBase, ds_id: int = 0):
@@ -36,6 +37,7 @@ def _format_datasource(db: Session, item: schemas.JobJdbcDatasourceBase, ds_id:
         ds = DataSrouceFactory.create(item.datasource, {'port': port, 'host': host, 'username': item.jdbc_username,
                                                     'password': item.jdbc_password,
                                                     'database_name': item.database_name,
+                                                    'use_ssl': item.use_ssl
                                                     })
     item.jdbc_url = ds.jdbc_url
     item.jdbc_username = ds.jdbc_username if item.kerberos == 0 else None

+ 2 - 1
app/models/database.py

@@ -15,8 +15,9 @@ PWD = config.get('DATABASE', 'pwd')
 DB_NAME = config.get('DATABASE', 'DB_NAME')
 HOST = config.get('DATABASE', 'HOST')
 PORT = config.get('DATABASE', 'PORT')
+SSL_DISABLED = config.get('DATABASE', 'SSL_DISABLED')
 
-SQLALCHEMY_DATABASE_URL = f'mysql+mysqlconnector://{USER}:{PWD}@{HOST}:{PORT}/{DB_NAME}?charset=utf8&auth_plugin=mysql_native_password'
+SQLALCHEMY_DATABASE_URL = f'mysql+mysqlconnector://{USER}:{PWD}@{HOST}:{PORT}/{DB_NAME}?ssl_disabled={SSL_DISABLED}&charset=utf8&auth_plugin=mysql_native_password'
 engine = create_engine(
     SQLALCHEMY_DATABASE_URL, pool_pre_ping=True
 )

+ 2 - 0
app/models/job_jdbc_datasource.py

@@ -45,4 +45,6 @@ class JobJdbcDatasource(BaseModel):
     kerberos_service_name = Column(String)
     # principal
     principal = Column(String)
+    # use_ssl
+    use_ssl = Column(Integer)
 

+ 0 - 55
app/routers/dag_file.py

@@ -1,55 +0,0 @@
-import io
-from multiprocessing.connection import wait
-import os
-from fastapi import APIRouter
-
-from utils.sx_time import sxtimeit
-from utils.sx_web import web_try
-from fastapi.responses import StreamingResponse
-
-router = APIRouter(
-    prefix="/jpt/dag_file",
-    tags=["dag-dag管理"],
-)
-
-
-@router.get("/")
-@web_try()
-@sxtimeit
-def get_dags(project_id: str, user_id: str):
-    path = f"./{user_id}/dag"
-    file_list = get_all_dags(path)
-    return file_list
-
-
-def get_all_dags(path):
-    file_list = []
-    files = os.listdir(path)
-    for file in files:
-        if file == ".DS_Store":
-            continue
-        next_path = path + '/' + file
-        if os.path.isdir(next_path):
-            n_file_list = get_all_dags(next_path)
-            file_list.extend(n_file_list)
-        else:
-            if file[-4:] == ".dag":
-                file_list.append(next_path.replace('./dag', ''))
-    return file_list
-
-
-@router.get("/info")
-@sxtimeit
-def get_file(uri: str):
-    response = StreamingResponse(get_file_byte('./dag' + uri))
-    return response
-
-
-def get_file_byte(filename, chunk_size=1024):
-    with open(filename, "rb") as f:
-        while True:
-            content = f.read(chunk_size)
-            if content:
-                yield content
-            else:
-                break

+ 1 - 1
app/routers/data_management.py

@@ -29,7 +29,7 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)):
-
+    current_time = int(time.time())
     table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
     tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
     af_run_id = data_transfer_run(tmp_table_name, table_name)

+ 32 - 9
app/routers/jm_homework.py

@@ -5,15 +5,17 @@ from fastapi import APIRouter
 from fastapi import Depends
 from sqlalchemy.orm import Session
 from app import schemas
-
-import app.crud as crud
+from app.common.hive import hiveDs
 from app.crud import jm_homework
+from app.services.jm_homework import create_jm_homework_services, update_jm_homework_service
 from app.services.jm_job import red_dag_and_format
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from fastapi_pagination import Page, add_pagination, paginate, Params
-
+import app.crud as crud
 from app import get_db
+from configs.settings import DefaultOption, config
+DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
 
 
 
@@ -26,9 +28,7 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def create_jm_homework(item: schemas.JmHomeworkCreate, db: Session = Depends(get_db)):
-    # 根据获取到的文件路径另存一份并改变
-    jm_homework = crud.create_jm_homework(db, item)
-    return jm_homework
+    return create_jm_homework_services(db, item)
 
 @router.get("/")
 @web_try()
@@ -46,8 +46,7 @@ def get_jm_homework_info(homework_id: str, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def update_jm_homework(jm_id: int, update_item: schemas.JmHomeworkUpdate, db: Session = Depends(get_db)):
-    # 根据获取到的文件路径另存一份并改变
-    return crud.update_jm_homework(db, jm_id, update_item)
+    return update_jm_homework_service(db, jm_id, update_item)
 
 @router.delete("/")
 @web_try()
@@ -65,4 +64,28 @@ def delete_jm_homework(jm_id: int, db: Session = Depends(get_db)):
 def get_test_dag(db: Session = Depends(get_db)):
     jm_homework = crud.get_jm_homework_info(db, 83)
     res = red_dag_and_format(jm_homework, db)
-    return res
+    return res
+
+@router.get("/local_source")
+@web_try()
+@sxtimeit
+def get_local_source():
+    return [{
+            'database_name': DATABASE_NAME,
+            'datasource': "hive",
+            'datasource_name': DATABASE_NAME,
+            'id': -1
+    }]
+
+@router.get("/local_source_table")
+@web_try()
+@sxtimeit
+def get_local_source_table():
+    t_list = hiveDs.list_tables()
+    return t_list
+
+@router.get("/local_source_table_schema")
+@web_try()
+@sxtimeit
+def get_local_source_table_schema(table_name: str, db: Session = Depends(get_db)):
+    return hiveDs.get_table_schema(table_name)

+ 10 - 40
app/routers/jm_job_info.py

@@ -9,6 +9,8 @@ from app import models, schemas
 
 import app.crud as crud
 from app.schemas import cron_expression
+from app.services.jm_job import on_off_control
+from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, update_jm_job_info_services, update_jm_job_status_services
 from app.utils.cron_utils import *
 from app.utils.send_util import send_delete, send_execute
 from utils.sx_time import sxtimeit
@@ -28,37 +30,8 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
-    jm_job_info,nodes,edges = crud.create_jm_job_info(db, item)
-    job_id = jm_job_info.id
-    create_jm_job_node(db, nodes, edges, job_id)
-    return jm_job_info.to_dict()
-
-
-def create_jm_job_node(db: Session, nodes, edges, job_id):
-    uuid_node_id = {}
-    if nodes is None or len(nodes) == 0:
-        return
-    for node in nodes:
-        uuid = node['id']
-        node_item = models.JmJobNode(**{
-            'job_id': job_id,
-            'homework_id': node['homework_id'],
-            'homework_name': node['homework_name'],
-            'start_point': 1,
-        })
-        node_item = crud.create_jm_job_node(db,node_item)
-        node_id = node_item.id
-        uuid_node_id.update({uuid:node_id})
-    if nodes is None or len(nodes) == 0:
-        return
-    for edge in edges:
-        edge_item = models.JmJobEdge(**{
-            'job_id': job_id,
-            'in_node_id': uuid_node_id[edge['source']],
-            'out_node_id': uuid_node_id[edge['target']]
-        })
-        edge = crud.create_jm_job_edge(db,edge_item)
-    return
+    return create_jm_job_info_services(db, item)
+
 
 @router.get("/")
 @web_try()
@@ -126,17 +99,15 @@ def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
-    jm_job_info,nodes,edges = crud.update_jm_job_info(db, item)
-    job_id = jm_job_info.id
-    crud.delete_job_node(db, job_id)
-    job_id = jm_job_info.id
-    create_jm_job_node(db, nodes, edges, job_id)
-    return jm_job_info.to_dict()
+    return update_jm_job_info_services(db, item)
 
 @router.delete("/")
 @web_try()
 @sxtimeit
 def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
+    jm_job = crud.get_jm_job_info(db,jm_job_id)
+    if jm_job.status == 1:
+        raise Exception('任务未停用,不可删除')
     relation = crud.get_af_id(db, jm_job_id, 'job')
     send_delete('/af/af_job', relation.af_id)
     return crud.delete_jm_job_info(db,jm_job_id)
@@ -145,7 +116,7 @@ def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
 @web_try()
 @sxtimeit
 def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
-    return crud.update_jm_job_status(db,item)
+    return update_jm_job_status_services(db, item.id, item.status)
 
 @router.post("/execute/{jm_job_id}")
 @web_try()
@@ -154,8 +125,7 @@ def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
     jm_job = crud.get_jm_job_info(db,jm_job_id)
     if jm_job.status == 0:
         raise Exception('任务已被停用')
-    relation = crud.get_af_id(db, jm_job_id, 'job')
-    res = send_execute(relation.af_id)
+    res = execute_job_services(db,jm_job_id)
     return res['data']
 
 

+ 9 - 14
app/routers/job_info.py

@@ -9,7 +9,8 @@ from app import models, page_help, schemas
 
 import app.crud as crud
 from app.crud import job_info
-from app.services.datax import datax_create_job, datax_update_job
+from app.services.datax import datax_create_job, datax_update_job, on_off_control
+from app.services.job_info import create_job_info_services, execute_job_services, update_job_info_services
 from app.utils.cron_utils import parsing_cron_expression
 from app.utils.send_util import *
 from app.utils.utils import *
@@ -32,9 +33,7 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)):
-    return crud.create_job_info(db, item)
-
-
+    return create_job_info_services(db,item)
 
 @router.get("/")
 @web_try()
@@ -68,14 +67,11 @@ def get_job_info(job_id: int, db: Session = Depends(get_db)):
         })
     return job_info_dict
 
-
-
 @router.put("/{id}")
 @web_try()
 @sxtimeit
 def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
-    job_info = crud.update_job_info(db, id, update_item)
-    return job_info
+    return update_job_info_services(db, id, update_item)
 
 @router.put("/update_trigger_status/")
 @web_try()
@@ -84,10 +80,7 @@ def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depe
     job_info = crud.get_job_info(db, item.id)
     relation = crud.get_af_id(db, job_info.id, 'datax')
     job_info.trigger_status = item.trigger_status
-    if not relation:
-        datax_create_job(job_info,db)
-    else:
-        datax_update_job(job_info,db)
+    on_off_control(relation.af_id, item.trigger_status)
     job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status)
     return job_info
 
@@ -95,6 +88,9 @@ def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depe
 @web_try()
 @sxtimeit
 def delete_job_info(job_id: int, db: Session = Depends(get_db)):
+    jm_job = crud.get_job_info(db, job_id)
+    if jm_job.trigger_status == 1:
+        raise Exception('任务未停用,不可删除')
     relation = crud.get_af_id(db, job_id, 'datax')
     send_delete('/af/af_job', relation.af_id)
     return crud.delete_job_info(db, job_id)
@@ -106,8 +102,7 @@ def execute_job_info(job_id: int, db: Session = Depends(get_db)):
     jm_job = crud.get_job_info(db, job_id)
     if jm_job.trigger_status == 0:
         raise Exception('任务已被停用')
-    relation = crud.get_af_id(db, job_id, 'datax')
-    res = send_execute(relation.af_id)
+    res = execute_job_services(db, job_id)
     return res['data']
 
 add_pagination(router)

+ 4 - 0
app/schemas/job_jdbc_datasouce.py

@@ -30,6 +30,8 @@ class JobJdbcDatasourceBase(BaseModel):
     kerberos_service_name: Optional[str]
     # principal
     principal: Optional[str]
+    # use_ssl
+    use_ssl: Optional[int] = 0
     class Config:
         schema_extra = {
             # "example": {
@@ -56,6 +58,7 @@ class JobJdbcDatasourceBase(BaseModel):
                 "krb5config": "test/kerberos/user.conf",
                 "kerberos_service_name": "hadoop",
                 "principal": "ailab@EMR-5XJSY31F",
+                "use_ssl": 0,
             }
         }
 
@@ -93,6 +96,7 @@ class JobJdbcDatasourceUpdate(JobJdbcDatasourceBase):
                 "krb5config": "test/kerberos/user.conf",
                 "kerberos_service_name": "hadoop",
                 "principal": "ailab@EMR-5XJSY31F",
+                "use_ssl": 0,
                 "status": 1,
             }
         }

+ 5 - 4
app/services/dag.py

@@ -93,18 +93,19 @@ def dag_job_submit(dag_uuid:str,dag_script: str,db: Session):
         af_job = dag_create_job(dag_uuid, dag_script, db)
     else:
         af_job = dag_update_job(dag_uuid, dag_script, db)
-    current_time = int(time.time())
+    res = get_job_last_parsed_time(af_job['id'])
+    current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
     send_submit(af_job['id'])
-    for i in range(0,11):
+    for i in range(0,21):
         time.sleep(2)
         res = get_job_last_parsed_time(af_job['id'])
         last_parsed_time = res['data']['last_parsed_time']
-        if last_parsed_time and current_time < int(last_parsed_time):
+        if last_parsed_time and current_time != last_parsed_time:
             send_pause(af_job['id'],1)
             send_execute(af_job['id'])
             print(f"{af_job['id']}<==执行成功==>{last_parsed_time}")
             break
-        if i >= 10:
+        if i >= 20:
             raise Exception(f"{af_job['id']}==>执行失败")
     return af_job
 

+ 17 - 15
app/services/datax.py

@@ -25,10 +25,8 @@ def datax_create_job(job_info: models.JobInfo, db: Session):
     }
     res = send_post('/af/af_job', af_job)
     af_job = res['data']
-    crud.create_relation(db, job_info.id,'datax', af_job['id'])
     send_submit(af_job['id'])
-    get_job_last_parsed_time()
-    # on_off_control(af_job['id'], job_info.trigger_status)
+    return af_job
 
 def datax_create_task(job_info: models.JobInfo):
     cmd_parameter = get_cmd_parameter(job_info.jvm_param)
@@ -61,7 +59,6 @@ def datax_create_task(job_info: models.JobInfo):
     af_task = res['data']
     return af_task
 
-
 def datax_update_job(job_info: models.JobInfo, db: Session):
     relation = crud.get_af_id(db, job_info.id, 'datax')
     af_job_id = relation.af_id
@@ -86,8 +83,7 @@ def datax_update_job(job_info: models.JobInfo, db: Session):
     res = send_put('/af/af_job', old_af_job['id'], af_job)
     af_job = res['data']
     send_submit(af_job['id'])
-    on_off_control(af_job['id'], job_info.trigger_status)
-
+    return af_job
 
 def datax_put_task(job_info: models.JobInfo,old_af_task):
     cmd_parameter = get_cmd_parameter(job_info.jvm_param)
@@ -118,15 +114,6 @@ def datax_put_task(job_info: models.JobInfo,old_af_task):
     af_task = res['data']
     return af_task
 
-
-def datax_job_submit(job_info: models.JobInfo, db: Session):
-    relation = crud.get_af_id(db, job_info.id, 'datax')
-    if not relation:
-        datax_create_job(job_info,db)
-    else:
-        datax_update_job(job_info,db)
-
-
 def on_off_control(af_job_id: int,status: int):
     for i in range(0,11):
         parsed_res = get_job_last_parsed_time(af_job_id)
@@ -137,4 +124,19 @@ def on_off_control(af_job_id: int,status: int):
             break
         if i >= 10:
             raise Exception(f"{af_job_id}==>状态修改失败")
+        time.sleep(2)
+
+def execute_job(af_job_id: int):
+    res = get_job_last_parsed_time(af_job_id)
+    current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
+    send_submit(af_job_id)
+    for i in range(0,21):
+        parsed_res = get_job_last_parsed_time(af_job_id)
+        last_parsed_time = parsed_res['data']['last_parsed_time']
+        if last_parsed_time and last_parsed_time != current_time:
+            res = send_execute(af_job_id)
+            print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
+            return res
+        if i >= 20:
+            raise Exception(f"{af_job_id}==>文件正在转化中")
         time.sleep(2)

+ 63 - 0
app/services/jm_homework.py

@@ -0,0 +1,63 @@
+import time
+from app import models, schemas
+from sqlalchemy.orm import Session
+import app.crud as crud
+from app.services.jm_job import jm_job_create_task, jm_job_update_task
+
+def create_jm_homework_services(db: Session, item: schemas.JmHomeworkCreate):
+    jm_homework_create = item.dict()
+    db_item = crud.get_jm_homework_by_name(db,jm_homework_create['name'])
+    if db_item:
+        raise Exception('作业名称已存在')
+    relation_list = []
+    if 'relation_list' in jm_homework_create.keys():
+        relation_list = jm_homework_create.pop('relation_list')
+    tag = jm_homework_create['tag']
+    crud.find_and_update(db, '作业标签', tag)
+    create_time: int = int(time.time())
+    db_item = models.JmHomework(**jm_homework_create,**{
+        'create_time': create_time,
+        'update_time': create_time,
+        'status': 1
+    })
+    # 创建airflow端作业
+    af_task = jm_job_create_task(db_item, relation_list, db)
+    # 创建local作业
+    db_item = crud.create_jm_homework(db,db_item)
+    # 若作业为dag类型并存在数据源关系,则新建数据源关系
+    if jm_homework_create['type'] == 'Dag' and relation_list is not None:
+        for relation in relation_list:
+            crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
+    # 创建关系表
+    crud.create_relation(db ,db_item.id, 'task', af_task['id'])
+    return db_item.to_dict()
+
+def update_jm_homework_service(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
+    jm_homework_update =update_item.dict(exclude_unset=True)
+    db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
+    if not db_item:
+        raise Exception('未找到该作业')
+    db_name_item = db.query(models.JmHomework)\
+        .filter(models.JmHomework.name == jm_homework_update['name'])\
+        .filter(models.JmHomework.status != 0)\
+        .filter(models.JmHomework.id != id).first()
+    if db_name_item:
+        raise Exception('作业名称已存在')
+    relation_list = []
+    if 'relation_list' in jm_homework_update.keys():
+        relation_list = jm_homework_update.pop('relation_list')
+    tag = jm_homework_update['tag']
+    crud.find_and_update(db, '作业标签', tag)
+    for k, v in jm_homework_update.items():
+        setattr(db_item, k, v)
+    db_item.update_time = int(time.time())
+    # 修改airflow端作业
+    af_task = jm_job_update_task(db_item, relation_list, db)
+    # 修改local作业
+    db_item = crud.update_jm_homework(db, id, db_item)
+    # 数据源关系修改
+    crud.delete_jm_relations(db,db_item.id)
+    if jm_homework_update['type'] == 'Dag' and relation_list is not None:
+        for relation in relation_list:
+            crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
+    return db_item.to_dict()

+ 47 - 44
app/services/jm_job.py

@@ -2,13 +2,15 @@ from asyncio import current_task
 import json
 import time
 from turtle import update
-from app import crud, models
+from app import crud, models, schemas
 from app.common import minio
 from app.core.datasource.datasource import DataSourceBase
 from app.crud.jm_homework_datasource_relation import get_jm_relations
 from app.utils.send_util import *
 from sqlalchemy.orm import Session
 from app.common.minio import minio_client
+from configs.settings import DefaultOption, config
+DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
 
 type_dict = {
     "Java": "java",
@@ -16,10 +18,10 @@ type_dict = {
     "Dag": "sparks"
 }
 
-def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
+def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Session):
     content = ''
     if jm_homework.type == "Dag":
-        content = red_dag_and_format(jm_homework, db)
+        content = red_dag_and_format(jm_homework, relation_list, db)
     elif jm_homework.type == "Python":
         content = red_python_and_format(jm_homework)
     af_task = {
@@ -35,14 +37,13 @@ def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
     }
     res = send_post('/af/af_task', af_task)
     af_task = res['data']
-    crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
     return af_task
 
-def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
+def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Session):
     relation = crud.get_af_id(db, jm_homework.id, 'task')
     content = ''
     if jm_homework.type == "Dag":
-        content = content = red_dag_and_format(jm_homework, db)
+        content = content = red_dag_and_format(jm_homework, relation_list, db)
     elif jm_homework.type == "Python":
         content = red_python_and_format(jm_homework)
     af_task = {
@@ -60,21 +61,12 @@ def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
     af_task = res['data']
     return af_task
 
-def jm_homework_submit(jm_homework: models.JmHomework, db: Session):
-    task_relation = crud.get_af_id(db,jm_homework.id,'task')
-    if task_relation is None:
-        jm_job_create_task(jm_homework, db)
-    else:
-        jm_job_update_task(jm_homework, db)
-
-def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
-    nodes = crud.get_one_job_nodes(db, jm_job_info.id)
-    homework_ids = [node.homework_id for node in nodes]
+def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
+    homework_ids = [node['homework_id'] for node in nodes]
     relations = crud.get_af_ids(db,homework_ids, 'task')
     se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
-    edges = crud.get_one_job_edges(db, jm_job_info.id)
-    dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
+    dependence = [[se_id_to_af_id_dict[edge['source']],se_id_to_af_id_dict[str(edge['target'])]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
     cron.replace('?','*')
     af_job = {
@@ -94,20 +86,16 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
     }
     res = send_post('/af/af_job', af_job)
     af_job = res['data']
-    crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
     send_submit(af_job['id'])
-    # on_off_control(af_job['id'],jm_job_info.status)
+    return af_job
 
 
-def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
-    nodes = crud.get_one_job_nodes(db, jm_job_info.id)
-    homework_ids = [node.homework_id for node in nodes]
-    node_id_to_h_id = {node.id:node.homework_id for node in nodes}
+def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
+    homework_ids = [node['homework_id'] for node in nodes]
     relations = crud.get_af_ids(db,homework_ids, 'task')
     se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
-    edges = crud.get_one_job_edges(db, jm_job_info.id)
-    dependence = [[se_id_to_af_id_dict[node_id_to_h_id[edge.in_node_id]],se_id_to_af_id_dict[node_id_to_h_id[edge.out_node_id]]] for edge in edges]
+    dependence = [[se_id_to_af_id_dict[edge['source']],se_id_to_af_id_dict[str(edge['target'])]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
     cron.replace('?','*')
     af_job = {
@@ -126,20 +114,9 @@ def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
     res = send_put('/af/af_job', job_relation.af_id, af_job)
     af_job = res['data']
     send_submit(af_job['id'])
-    on_off_control(af_job['id'],jm_job_info.status)
-
-
-def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
-    job_relation = crud.get_af_id(db,jm_job_info.id,'job')
-    if job_relation is None:
-        jm_job_create_job(jm_job_info, db)
-    else:
-        jm_job_update_job(jm_job_info, db)
-
 
-def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
-    relations = get_jm_relations(db,jm_homework.id)
-    node_relation_dict = { relation.node_uuid:relation for relation in relations}
+def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Session):
+    node_relation_dict = { relation['node_uuid']:relation for relation in relation_list}
     file = minio_client.get_file(jm_homework.dag_url)
     result = json.loads(file)
     edges = result['edges']
@@ -159,8 +136,14 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             for filed in fileds:
                 script += filed['dataField'] + ','
             script = script.strip(',')
-            data_source = crud.get_job_jdbc_datasource(db,node_relation_dict[node['id']].datasource_id)
-            script += ' from ' + data_source.database_name + '.'+node_relation_dict[node['id']].table+''
+            ds_id = node_relation_dict[node['id']]['datasource_id']
+            database_name = ""
+            if ds_id  == -1:
+                database_name = DATABASE_NAME
+            else:
+                data_source = crud.get_job_jdbc_datasource(db,ds_id)
+                database_name = data_source.database_name
+            script += ' from ' + database_name + '.'+node_relation_dict[node['id']]['table']
             sub_node = {
                 "id": node['id'],
                 "name": node['name'],
@@ -170,9 +153,15 @@ def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
             sub_nodes.append(sub_node)
         elif node['op'] == 'outputsource':
             fileds = node['data']['output_source']
-            data_source = crud.get_job_jdbc_datasource(db,node_relation_dict[node['id']].datasource_id)
+            ds_id = node_relation_dict[node['id']]['datasource_id']
+            database_name = ""
+            if ds_id  == -1:
+                database_name = DATABASE_NAME
+            else:
+                data_source = crud.get_job_jdbc_datasource(db,ds_id)
+                database_name = data_source.database_name
             script = '''def main_func (input0, spark,sc):
-    input0.write.mode("overwrite").saveAsTable("''' + data_source.database_name + '.'+node_relation_dict[node['id']].table+'''")'''
+    input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']]['table']+'''")'''
             inputs = {}
             index = 0
             input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
@@ -220,7 +209,6 @@ def red_python_and_format(jm_homework):
     file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
     return file.decode("utf-8")
 
-
 def on_off_control(af_job_id: int,status: int):
     for i in range(0,11):
         parsed_res = get_job_last_parsed_time(af_job_id)
@@ -231,4 +219,19 @@ def on_off_control(af_job_id: int,status: int):
             break
         if i >= 10:
             raise Exception(f"{af_job_id}==>执行失败")
+        time.sleep(2)
+
+def execute_job(af_job_id: int):
+    res = get_job_last_parsed_time(af_job_id)
+    current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
+    send_submit(af_job_id)
+    for i in range(0,21):
+        parsed_res = get_job_last_parsed_time(af_job_id)
+        last_parsed_time = parsed_res['data']['last_parsed_time']
+        if last_parsed_time and last_parsed_time != current_time:
+            res = send_execute(af_job_id)
+            print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
+            return res
+        if i >= 20:
+            raise Exception(f"{af_job_id}==>文件正在转化中")
         time.sleep(2)

+ 118 - 0
app/services/jm_job_info.py

@@ -0,0 +1,118 @@
+from sqlalchemy.orm import Session
+from app import models, schemas
+from app.services.jm_job import execute_job, jm_job_create_job, jm_job_update_job, on_off_control
+from app.utils.cron_utils import joint_cron_expression
+import app.crud as crud
+
+def create_jm_job_info_services(db: Session, item: schemas.JmJobInfoCreate):
+    jm_job_info_create = item.dict()
+    # 定时对象转为cron表达式
+    cron_expression_item = jm_job_info_create.pop('cron_expression', None)
+    if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None:
+        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
+        cron_select_type = cron_expression_item["cron_select_type"]
+        jm_job_info_create.update({
+            'cron_select_type': cron_select_type,
+            'cron_expression': cron_expression,
+        })
+    # 节点与边的剥离
+    nodes = jm_job_info_create.pop('nodes', None)
+    edges = jm_job_info_create.pop('edges', None)
+    db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
+        .filter(models.JmJobInfo.delete_status != 0).first()
+    if db_item:
+        raise Exception('定时任务名称已存在')
+    # 标签的存储
+    tag = jm_job_info_create['tag']
+    crud.find_and_update(db, '任务标签', tag)
+    jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
+        'status': 0,
+        'delete_status': 1,
+    })
+    # 创建airflow端任务
+    af_job = jm_job_create_job(jm_job_info,nodes,edges,db)
+    # 创建local端任务
+    jm_job_info = crud.create_jm_job_info(db,jm_job_info)
+    # 创建多作业节点与节点关系
+    create_jm_job_node(db, nodes, edges, jm_job_info.id)
+    # 创建关系
+    crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
+    return jm_job_info
+
+
+def update_jm_job_info_services(db: Session, item: schemas.JmJobInfoUpdate):
+    jm_job_info_update = item.dict(exclude_unset=True)
+    # 定时对象转为cron表达式
+    cron_expression_item = jm_job_info_update.pop('cron_expression', None)
+    if jm_job_info_update['cron_type'] == 2:
+        cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item))
+        cron_select_type = cron_expression_item["cron_select_type"]
+        jm_job_info_update.update({
+            'cron_select_type': cron_select_type,
+            'cron_expression': cron_expression,
+        })
+    # 节点与边的剥离
+    nodes = jm_job_info_update.pop('nodes', None)
+    edges = jm_job_info_update.pop('edges', None)
+    db_item = db.query(models.JmJobInfo)\
+        .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
+    if not db_item:
+        raise Exception('未找到该定时任务')
+    db_name_item = db.query(models.JmJobInfo)\
+        .filter(models.JmJobInfo.name == jm_job_info_update['name'])\
+        .filter(models.JmJobInfo.delete_status != 0)\
+        .filter(models.JmJobInfo.id != item.id).first()
+    if db_name_item:
+        raise Exception('定时任务名称已存在')
+    # 标签的存储
+    tag = jm_job_info_update['tag']
+    crud.find_and_update(db, '任务标签', tag)
+    for k, v in jm_job_info_update.items():
+        setattr(db_item, k, v)
+    # 修改airflow端任务
+    af_job = jm_job_update_job(db_item,nodes,edges,db)
+    # 修改local端任务
+    db_item = crud.update_jm_job_info(db,db_item)
+    # 删除之前的作业节点并创建新作业节点与节点关系
+    crud.delete_job_node(db, db_item.id)
+    create_jm_job_node(db, nodes, edges, db_item.id)
+    return db_item
+
+
+def create_jm_job_node(db: Session, nodes, edges, job_id):
+    uuid_node_id = {}
+    if nodes is None or len(nodes) == 0:
+        return
+    for node in nodes:
+        uuid = node['id']
+        node_item = models.JmJobNode(**{
+            'job_id': job_id,
+            'homework_id': node['homework_id'],
+            'homework_name': node['homework_name'],
+            'start_point': 1,
+        })
+        node_item = crud.create_jm_job_node(db,node_item)
+        node_id = node_item.id
+        uuid_node_id.update({uuid:node_id})
+    if nodes is None or len(nodes) == 0:
+        return
+    for edge in edges:
+        edge_item = models.JmJobEdge(**{
+            'job_id': job_id,
+            'in_node_id': uuid_node_id[edge['source']],
+            'out_node_id': uuid_node_id[edge['target']]
+        })
+        edge = crud.create_jm_job_edge(db,edge_item)
+    return
+
+
+def update_jm_job_status_services(db: Session, job_id: int, status: int):
+    job_relation = crud.get_af_id(db,job_id,'job')
+    on_off_control(job_relation.af_id, status)
+    return crud.update_jm_job_status(db,job_id,status)
+
+def execute_job_services(db: Session, jm_job_id: int):
+    relation = crud.get_af_id(db, jm_job_id, 'job')
+    res = execute_job(relation.af_id)
+    return res
+

+ 80 - 0
app/services/job_info.py

@@ -0,0 +1,80 @@
+import time
+from app import models, schemas
+from app.services.datax import datax_create_job, datax_update_job, execute_job
+from app.utils.cron_utils import joint_cron_expression
+from sqlalchemy.orm import Session
+import app.crud as crud
+
+def create_job_info_services(db: Session, item: schemas.JobInfoCreate):
+    create_time: int = int(time.time())
+    item_dict = item.dict()
+    # 定时任务对象转为cron表达式
+    cron_expression_dict = item_dict.pop('cron_expression')
+    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
+    cron_select_type = cron_expression_dict["cron_select_type"]
+    item_dict.update({
+        'cron_select_type': cron_select_type,
+        'job_cron': cron_expression,
+    })
+    # 分区信息拼接
+    partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() and item_dict['partition_info'] != '' else None
+    partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() and item_dict['partition_time'] != '' else None
+    partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() and item_dict['partition_num'] != '' else None
+    partition_info_str = ''
+    if partition_info is not None and partition_time is not None and partition_num is not None:
+        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
+    elif partition_info is not None and (partition_time is None or partition_num is None):
+        raise Exception('分区信息不完善')
+    item_dict.update({
+        'partition_info': partition_info_str,
+    })
+    db_item = models.JobInfo(**item_dict, **{
+        'trigger_status': 0,
+        'create_time': create_time,
+        'update_time': create_time,
+        'delete_status': 1,
+    })
+    # 创建airflow端同步任务
+    af_job = datax_create_job(db_item, db)
+    # 创建本地同步任务
+    db_item = crud.create_job_info(db, db_item)
+    crud.create_relation(db, db_item.id,'datax', af_job['id'])
+    return db_item
+
+
+def update_job_info_services(db: Session, id: int, update_item: schemas.JobInfoUpdate):
+    # 获取任务信息
+    db_item = crud.get_job_info(db,id)
+    update_dict = update_item.dict(exclude_unset=True)
+    # 定时任务对象转为cron表达式
+    cron_expression_dict = update_dict.pop('cron_expression')
+    cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
+    cron_select_type = cron_expression_dict["cron_select_type"]
+    update_dict.update({
+        'cron_select_type': cron_select_type,
+        'job_cron': cron_expression,
+    })
+    # 分区信息拼接
+    partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() and update_dict['partition_info'] != '' else None
+    partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys()  and update_dict['partition_time'] != '' else None
+    partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys()  and update_dict['partition_num'] != '' else None
+    partition_info_str = ''
+    if partition_info is not None and partition_time is not None and partition_num is not None:
+        partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
+    elif partition_info is not None and (partition_time is None or partition_num is None):
+        raise Exception('分区信息不完善')
+    update_dict.update({
+        'partition_info': partition_info_str,
+    })
+    for k, v in update_dict.items():
+        setattr(db_item, k, v)
+    db_item.update_time = int(time.time())
+    # 修改airflow端同步任务
+    af_job = datax_update_job(db_item, db)
+    crud.update_job_info(db,id,db_item)
+    return db_item
+
+def execute_job_services(db: Session, job_id: int):
+    relation = crud.get_af_id(db, job_id, 'datax')
+    res = execute_job(relation.af_id)
+    return res

+ 8 - 0
configs/settings.py

@@ -1,5 +1,6 @@
 
 import configparser
+import json
 import os
 
 class DefaultOption(dict):
@@ -22,6 +23,13 @@ if os.environ.get('APP_ENV', 'development') == 'development':
     config.readfp(open('development.ini'))
 elif os.environ.get('APP_ENV') == 'production':
     config.readfp(open('production.ini'))
+elif os.environ.get('APP_ENV') == 'idctest':
+    config.readfp(open('idctest.ini'))
 
 print(f"get config of {os.environ.get('APP_ENV')}")
 print(config.get('DATABASE', 'host'))
+hadoop_config = config.get('HADOOP_INNER', 'hadoop_config')
+print(json.loads(hadoop_config))
+
+
+print((config.get('HADOOP_INNER', 'datasource')).split(','))

+ 7 - 0
data/data.sql

@@ -322,4 +322,11 @@ ADD COLUMN `table_name` varchar(255) NULL COMMENT '表名' AFTER `name`;
 ALTER TABLE `constant`
 MODIFY COLUMN `value` varchar(500) CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL COMMENT '常量值' AFTER `type`;
 
+
+-- ----------------------------
+-- Alter for job_jdbc_datasource
+-- ----------------------------
+ALTER TABLE `job_jdbc_datasource`
+ADD COLUMN `use_ssl` tinyint(1) NULL COMMENT '是否使用ssl(0:不使用,1:使用)' AFTER `principal`;
+
 SET FOREIGN_KEY_CHECKS = 1;

+ 18 - 0
development.ini

@@ -4,6 +4,7 @@ pwd = happylay
 db_name = datax_web_dev
 host = 192.168.199.107
 port = 10086
+ssl_disabled = true
 
 [MINIO]
 url = minio-api.sxkj.com
@@ -47,3 +48,20 @@ datax=SXKJ:32775/pod_datax:0.9
 python=SXKJ:32775/pod_python:1.1
 java=SXKJ:32775/java:1.0
 sparks=SXKJ:32775/jupyter:0.981
+
+
+[HADOOP_INNER]
+datasource = -1
+default_fs = hdfs://HDFS8000912
+hadoop_config={
+            "dfs.nameservices": "HDFS8000912",
+            "dfs.ha.namenodes.HDFS8000912": "nn1,nn2",
+            "dfs.namenode.rpc-address.HDFS8000912.nn1": "10.254.20.18:4007",
+            "dfs.namenode.rpc-address.HDFS8000912.nn2": "10.254.20.22:4007",
+            "dfs.client.failover.proxy.provider.HDFS8000912": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+             }
+kerberos_config = {
+                    "haveKerberos": "true",
+                    "kerberosKeytabFilePath": "/workspace/confs/test/user.keytab",
+                    "kerberosPrincipal": "ailab@EMR-5XJSY31F"
+                  }

+ 1 - 0
environment.yml

@@ -30,6 +30,7 @@ dependencies:
       - thrift==0.16.0
       - thrift-sasl==0.4.3
       - mysql-connector==2.2.9
+      - mysql-connector-python==8.0.29
       - SQLAlchemy==1.4.9
       - numpy
       - pandas

+ 72 - 0
idctest.ini

@@ -0,0 +1,72 @@
+[DATABASE]
+user = test_dev
+pwd = Yldve35@!
+db_name = aihubtest_dag_admin_db
+host = 10.138.143.44
+port = 3306
+ssl_disabled = true
+
+[MINIO]
+k8s_url = aihub-minio-yili-test:9000
+url = aihub-minio-yili-test:9000
+access_key = minioadmin
+secret_key = minioadmin
+
+
+[AF_BACKEND]
+uri=aihub-backend-af-yili-test:8080
+host=aihub-backend-af-yili-test
+port=8080
+dag_files_dir=/dags/
+
+[K8S]
+image_pull_key=codingregistrykey
+enable_kerberos=true
+
+[BACKEND]
+url=aihub-backend-yili-test:8080
+
+[AIRFLOW]
+uri=airflow-webserver:8080
+api_token=YWRtaW46YWRtaW4=
+
+
+[HIVE]
+host = 10.254.20.22
+port = 7001
+username = hive
+password = hive
+database_name = ailab
+kerberos = 1
+keytab = assets/test/user.keytab
+krb5config = assets/test/krb5.conf
+kerberos_service_name = hadoop
+principal = ailab@EMR-5XJSY31F
+
+
+[HIVE_METASTORE]
+uris=thrift://10.254.20.18:7004,thrift://10.254.20.22:7004
+
+[TASK_IMAGES]
+datax=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-datax-yili:latest
+python=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-minio-yili-test:python
+java=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-minio-yili-test:java
+sparks=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-minio-yili-test:spark
+
+
+
+[HADOOP_INNER]
+datasource = 8,10,11
+default_fs = hdfs://HDFS8000912
+hadoop_config={
+            "dfs.nameservices": "HDFS8000912",
+            "dfs.ha.namenodes.HDFS8000912": "nn1,nn2",
+            "dfs.namenode.rpc-address.HDFS8000912.nn1": "10.254.20.18:4007",
+            "dfs.namenode.rpc-address.HDFS8000912.nn2": "10.254.20.22:4007",
+            "dfs.client.failover.proxy.provider.HDFS8000912": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+             }
+kerberos_config = {
+                    "haveKerberos": "true",
+                    "kerberosKeytabFilePath": "/workspace/confs/test/user.keytab",
+                    "kerberosPrincipal": "ailab@EMR-5XJSY31F"
+                  }

+ 17 - 3
production.ini

@@ -4,6 +4,8 @@ pwd = q9WBYDynEy@jh#5N
 db_name = aihubtest_dag_admin_db
 host = 10.254.12.7
 port = 3306
+ssl_disabled = true
+
 [MINIO]
 k8s_url = aihub-minio-yili-test:9000
 url = aihub-minio-yili-test:9000
@@ -44,7 +46,6 @@ principal = ailab@EMR-5XJSY31F
 
 [HIVE_METASTORE]
 uris=thrift://10.254.20.18:7004,thrift://10.254.20.22:7004
-;uris=thrift://10.254.20.22:7004
 
 [TASK_IMAGES]
 datax=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-datax-yili:latest
@@ -54,5 +55,18 @@ sparks=yldc-docker.pkg.coding.yili.com/aiplatform/docker/aihub-minio-yili-test:s
 
 
 
-
-
+[HADOOP_INNER]
+datasource = 8,10,11
+default_fs = hdfs://HDFS8000912
+hadoop_config={
+            "dfs.nameservices": "HDFS8000912",
+            "dfs.ha.namenodes.HDFS8000912": "nn1,nn2",
+            "dfs.namenode.rpc-address.HDFS8000912.nn1": "10.254.20.18:4007",
+            "dfs.namenode.rpc-address.HDFS8000912.nn2": "10.254.20.22:4007",
+            "dfs.client.failover.proxy.provider.HDFS8000912": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+             }
+kerberos_config = {
+                    "haveKerberos": "true",
+                    "kerberosKeytabFilePath": "/workspace/confs/test/user.keytab",
+                    "kerberosPrincipal": "ailab@EMR-5XJSY31F"
+                  }