123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- import re
- import time
- from typing import Optional
- from fastapi import APIRouter, Depends, Header
- from sqlalchemy.orm import Session
- import app.crud as crud
- import app.utils.send_util as send_util
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from app.common.security.auth import verify_super_admin, verify_special, verify_users
- from app.common.hive import hiveDs
- from app import schemas, get_db
- from configs.settings import DefaultOption, config
- DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
- STRUCTURING_DATA_DBID = config.get('AI_YIQI', 'STRUCTURING_DATA_DBID')
- router = APIRouter(
- prefix="/jpt/datasource",
- tags=["datasource-数据源管理"],
- )
- @router.post("/test")
- @web_try()
- @sxtimeit
- def test_datasource_connection(ds: schemas.JobJdbcDatasourceCreate,
- token_data: schemas.TokenData = Depends(verify_super_admin),
- db: Session = Depends(get_db)):
- return crud.test_datasource_connection(db, ds)
- @router.post("/preview")
- @web_try()
- @sxtimeit
- def get_preview_data(ds_id: int, table_name: str, limit: int = 100, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return crud.get_preview_data(db, ds_id, table_name, limit)
- @router.post("/table_names", description="获取所有表名")
- @web_try()
- @sxtimeit
- def get_table_names(ds_id: int, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return crud.get_table_names(db, ds_id)
- @router.post("/table_schema", description="获取表结构信息")
- @web_try()
- @sxtimeit
- def get_table_schema(ds_id: int, table_name: str, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return crud.get_table_schema(db, ds_id, table_name)
- @router.post("/")
- @web_try()
- @sxtimeit
- def create_datasource(ds: schemas.JobJdbcDatasourceCreate, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
- return crud.create_job_jdbc_datasource(db, ds)
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_datasources(datasource_type: Optional[str] = None, params: Params = Depends(), token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return paginate(crud.get_job_jdbc_datasources(db, datasource_type), params)
- @router.get("/info")
- @web_try()
- @sxtimeit
- def get_datasources_info(ds_id: int, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return crud.get_job_jdbc_datasources_info(db, ds_id)
- @router.put("/{ds_id}")
- @web_try()
- @sxtimeit
- def update_datasource(ds_id: int, ds: schemas.JobJdbcDatasourceUpdate, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
- return crud.update_job_jdbc_datasources(db, ds_id, ds)
- @router.delete("/{ds_id}")
- @web_try()
- @sxtimeit
- def delete_job_jdbc_datasource(ds_id: int, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
- return crud.delete_job_jdbc_datasource(db, ds_id)
- @router.post("/import_datalake")
- @web_try()
- @sxtimeit
- def import_datalake(item: schemas.ImportDataLake, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
- return crud.import_datalake(db, item)
- @router.put("/update_datalake/{dl_id}")
- @web_try()
- @sxtimeit
- def update_datalake(dl_id: int, item: schemas.ImportDataLake, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
- return crud.update_datalake(db, dl_id, item)
- @router.delete("/delete_datalake/{dl_id}")
- @web_try()
- @sxtimeit
- def delete_datalake(dl_id: int, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
- return crud.delete_datalake(db, dl_id)
- @router.post("/share_ailab")
- @web_try()
- @sxtimeit
- def share_ailab(item: schemas.ShareAilab, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return crud.share_ailab(db, item)
- @router.post("/create_table")
- @web_try()
- @sxtimeit
- def create_table(item: schemas.CreateAilab, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- return crud.create_table(db, item)
- @router.get("/ailab_source")
- @web_try()
- @sxtimeit
- def get_ailab_source(token_data: schemas.TokenData = Depends(verify_users), ):
- return [{
- 'database_name': DATABASE_NAME,
- 'datasource': "hive",
- 'datasource_name': DATABASE_NAME,
- 'id': -1
- }]
- @router.get("/ailab_table")
- @web_try()
- @sxtimeit
- def get_ailab_table(token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- return crud.get_ailab_table(db, token_data.project_id)
- @router.get("/ailab_table_schema")
- @web_try()
- @sxtimeit
- def get_ailab_table_schema(table_name: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- return crud.get_ailab_table_schema(db, table_name)
- @router.get("/preview_ailab_table")
- @web_try()
- @sxtimeit
- def get_preview_ailab_table(table_name: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- return crud.get_preview_ailab_table(db, table_name)
- @router.get("/lake_table")
- @web_try()
- @sxtimeit
- def get_lake_table(token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return crud.get_lake_table(db, token_data.project_id)
- @router.get("/lake_table_info")
- @web_try()
- @sxtimeit
- def get_lake_table_info(dl_id: int, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return crud.get_lake_table_info(db, dl_id)
- @router.get("/lake_table_schema")
- @web_try()
- @sxtimeit
- def get_lake_table_schema(db_name: str, table_name: str, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return crud.get_lake_table_schema(db, db_name, table_name)
- @router.get("/preview_lake_table")
- @web_try()
- @sxtimeit
- def get_preview_lake_table(db_name: str, table_name: str, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
- return crud.get_preview_lake_table(db, db_name, table_name)
- @router.get("/table_location")
- @web_try()
- @sxtimeit
- def get_table_location(db_name: str, table_name: str, ds_id: Optional[int] = None, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- def find_n_sub_str(src, sub, pos, start):
- index = src.find(sub, start)
- if index != -1 and pos > 0:
- return find_n_sub_str(src, sub, pos - 1, index + 1)
- return index
- res = None
- if ds_id is None:
- res = crud.get_table_info(db, db_name, table_name)
- else:
- res = crud.get_job_jdbc_datasource_table_location(
- db, db_name, table_name, ds_id)
- location = ''
- hdfs = ''
- for line_list in res[0]:
- if line_list[0].find('Location') >= 0:
- location = line_list[1]
- index = int(find_n_sub_str(location, '/', 2, 0))
- if index > 0:
- hdfs, location = location[0:index], location[index:]
- break
- return {'location': location, 'hdfs': hdfs}
- @router.post("/structuring_data")
- @web_try()
- @sxtimeit
- def get_image_version(table_name: str, auth_token: str = Header(), token_data: schemas.TokenData = Depends(verify_users)):
- current_time = int(time.time())
- name = f'{table_name}_{current_time}'
- table_schema = hiveDs.get_table_schema(table_name)
- if len(table_schema) == 0:
- raise Exception('数据表无列,无法导出')
- table_headers = [str(x).split(':')[1] for x in table_schema]
- dataTitleView = ''
- for header in table_headers:
- dataTitleView = dataTitleView + header + ','
- dataTitleView = dataTitleView[0:-1]
- request_data = {
- "name": name,
- "dsType": "all",
- "identifyField": table_headers[0],
- "dsClass": "1",
- "dataSaveCycle": "",
- "cronType": "",
- "cronTrigger": "",
- "reviewFlag": "0",
- "extractType": "0",
- "extractRows": "",
- "dataType": "db",
- "dsConf": {
- "type": "db",
- "dbId": STRUCTURING_DATA_DBID,
- "tableName": f'`{table_name}`',
- "cycleMask": "",
- "sqlSentence": ""
- },
- "metaDataTemplate": {
- "metaDataType": "1",
- "templatePath": "",
- "matchType": "1"
- },
- "dataTitleView": dataTitleView
- }
- res = send_util.post_structuring_data(request_data, auth_token)
- return res['data']
- add_pagination(router)
|