import re import time from typing import Optional from fastapi import APIRouter, Depends, Header from sqlalchemy.orm import Session import app.crud as crud import app.utils.send_util as send_util from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi_pagination import Page, add_pagination, paginate, Params from app.common.security.auth import verify_super_admin, verify_special, verify_users from app.common.hive import hiveDs from app import schemas, get_db from configs.settings import DefaultOption, config DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME') STRUCTURING_DATA_DBID = config.get('AI_YIQI', 'STRUCTURING_DATA_DBID') router = APIRouter( prefix="/jpt/datasource", tags=["datasource-数据源管理"], ) @router.post("/test") @web_try() @sxtimeit def test_datasource_connection(ds: schemas.JobJdbcDatasourceCreate, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)): return crud.test_datasource_connection(db, ds) @router.post("/preview") @web_try() @sxtimeit def get_preview_data(ds_id: int, table_name: str, limit: int = 100, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return crud.get_preview_data(db, ds_id, table_name, limit) @router.post("/table_names", description="获取所有表名") @web_try() @sxtimeit def get_table_names(ds_id: int, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return crud.get_table_names(db, ds_id) @router.post("/table_schema", description="获取表结构信息") @web_try() @sxtimeit def get_table_schema(ds_id: int, table_name: str, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return crud.get_table_schema(db, ds_id, table_name) @router.post("/") @web_try() @sxtimeit def create_datasource(ds: schemas.JobJdbcDatasourceCreate, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)): return crud.create_job_jdbc_datasource(db, ds) @router.get("/") @web_try() @sxtimeit def get_datasources(datasource_type: Optional[str] = None, params: Params = Depends(), token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return paginate(crud.get_job_jdbc_datasources(db, datasource_type), params) @router.get("/info") @web_try() @sxtimeit def get_datasources_info(ds_id: int, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return crud.get_job_jdbc_datasources_info(db, ds_id) @router.put("/{ds_id}") @web_try() @sxtimeit def update_datasource(ds_id: int, ds: schemas.JobJdbcDatasourceUpdate, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)): return crud.update_job_jdbc_datasources(db, ds_id, ds) @router.delete("/{ds_id}") @web_try() @sxtimeit def delete_job_jdbc_datasource(ds_id: int, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)): return crud.delete_job_jdbc_datasource(db, ds_id) @router.post("/import_datalake") @web_try() @sxtimeit def import_datalake(item: schemas.ImportDataLake, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)): return crud.import_datalake(db, item) @router.put("/update_datalake/{dl_id}") @web_try() @sxtimeit def update_datalake(dl_id: int, item: schemas.ImportDataLake, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)): return crud.update_datalake(db, dl_id, item) @router.delete("/delete_datalake/{dl_id}") @web_try() @sxtimeit def delete_datalake(dl_id: int, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)): return crud.delete_datalake(db, dl_id) @router.post("/share_ailab") @web_try() @sxtimeit def share_ailab(item: schemas.ShareAilab, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return crud.share_ailab(db, item) @router.post("/create_table") @web_try() @sxtimeit def create_table(item: schemas.CreateAilab, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return crud.create_table(db, item) @router.get("/ailab_source") @web_try() @sxtimeit def get_ailab_source(token_data: schemas.TokenData = Depends(verify_users), ): return [{ 'database_name': DATABASE_NAME, 'datasource': "hive", 'datasource_name': DATABASE_NAME, 'id': -1 }] @router.get("/ailab_table") @web_try() @sxtimeit def get_ailab_table(token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return crud.get_ailab_table(db, token_data.project_id) @router.get("/ailab_table_schema") @web_try() @sxtimeit def get_ailab_table_schema(table_name: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return crud.get_ailab_table_schema(db, table_name) @router.get("/preview_ailab_table") @web_try() @sxtimeit def get_preview_ailab_table(table_name: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return crud.get_preview_ailab_table(db, table_name) @router.get("/lake_table") @web_try() @sxtimeit def get_lake_table(token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return crud.get_lake_table(db, token_data.project_id) @router.get("/lake_table_info") @web_try() @sxtimeit def get_lake_table_info(dl_id: int, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return crud.get_lake_table_info(db, dl_id) @router.get("/lake_table_schema") @web_try() @sxtimeit def get_lake_table_schema(db_name: str, table_name: str, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return crud.get_lake_table_schema(db, db_name, table_name) @router.get("/preview_lake_table") @web_try() @sxtimeit def get_preview_lake_table(db_name: str, table_name: str, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)): return crud.get_preview_lake_table(db, db_name, table_name) @router.get("/table_location") @web_try() @sxtimeit def get_table_location(db_name: str, table_name: str, ds_id: Optional[int] = None, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): def find_n_sub_str(src, sub, pos, start): index = src.find(sub, start) if index != -1 and pos > 0: return find_n_sub_str(src, sub, pos - 1, index + 1) return index res = None if ds_id is None: res = crud.get_table_info(db, db_name, table_name) else: res = crud.get_job_jdbc_datasource_table_location( db, db_name, table_name, ds_id) location = '' hdfs = '' for line_list in res[0]: if line_list[0].find('Location') >= 0: location = line_list[1] index = int(find_n_sub_str(location, '/', 2, 0)) if index > 0: hdfs, location = location[0:index], location[index:] break return {'location': location, 'hdfs': hdfs} @router.post("/structuring_data") @web_try() @sxtimeit def get_image_version(table_name: str, auth_token: str = Header(), token_data: schemas.TokenData = Depends(verify_users)): current_time = int(time.time()) name = f'{table_name}_{current_time}' table_schema = hiveDs.get_table_schema(table_name) if len(table_schema) == 0: raise Exception('数据表无列,无法导出') table_headers = [str(x).split(':')[1] for x in table_schema] dataTitleView = '' for header in table_headers: dataTitleView = dataTitleView + header + ',' dataTitleView = dataTitleView[0:-1] request_data = { "name": name, "dsType": "all", "identifyField": table_headers[0], "dsClass": "1", "dataSaveCycle": "", "cronType": "", "cronTrigger": "", "reviewFlag": "0", "extractType": "0", "extractRows": "", "dataType": "db", "dsConf": { "type": "db", "dbId": STRUCTURING_DATA_DBID, "tableName": f'`{table_name}`', "cycleMask": "", "sqlSentence": "" }, "metaDataTemplate": { "metaDataType": "1", "templatePath": "", "matchType": "1" }, "dataTitleView": dataTitleView } res = send_util.post_structuring_data(request_data, auth_token) return res['data'] add_pagination(router)