job_jdbc_datasource.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. import re
  2. import time
  3. from typing import Optional
  4. from fastapi import APIRouter, Depends, Header
  5. from sqlalchemy.orm import Session
  6. import app.crud as crud
  7. import app.utils.send_util as send_util
  8. from utils.sx_time import sxtimeit
  9. from utils.sx_web import web_try
  10. from fastapi_pagination import Page, add_pagination, paginate, Params
  11. from app.common.security.auth import verify_super_admin, verify_special, verify_users
  12. from app.common.hive import hiveDs
  13. from app import schemas, get_db
  14. from configs.settings import DefaultOption, config
  15. DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
  16. STRUCTURING_DATA_DBID = config.get('AI_YIQI', 'STRUCTURING_DATA_DBID')
  17. router = APIRouter(
  18. prefix="/jpt/datasource",
  19. tags=["datasource-数据源管理"],
  20. )
  21. @router.post("/test")
  22. @web_try()
  23. @sxtimeit
  24. def test_datasource_connection(ds: schemas.JobJdbcDatasourceCreate,
  25. token_data: schemas.TokenData = Depends(verify_super_admin),
  26. db: Session = Depends(get_db)):
  27. return crud.test_datasource_connection(db, ds)
  28. @router.post("/preview")
  29. @web_try()
  30. @sxtimeit
  31. def get_preview_data(ds_id: int, table_name: str, limit: int = 100, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  32. return crud.get_preview_data(db, ds_id, table_name, limit)
  33. @router.post("/table_names", description="获取所有表名")
  34. @web_try()
  35. @sxtimeit
  36. def get_table_names(ds_id: int, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  37. return crud.get_table_names(db, ds_id)
  38. @router.post("/table_schema", description="获取表结构信息")
  39. @web_try()
  40. @sxtimeit
  41. def get_table_schema(ds_id: int, table_name: str, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  42. return crud.get_table_schema(db, ds_id, table_name)
  43. @router.post("/")
  44. @web_try()
  45. @sxtimeit
  46. def create_datasource(ds: schemas.JobJdbcDatasourceCreate, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
  47. return crud.create_job_jdbc_datasource(db, ds)
  48. @router.get("/")
  49. @web_try()
  50. @sxtimeit
  51. def get_datasources(datasource_type: Optional[str] = None, params: Params = Depends(), token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  52. return paginate(crud.get_job_jdbc_datasources(db, datasource_type), params)
  53. @router.get("/info")
  54. @web_try()
  55. @sxtimeit
  56. def get_datasources_info(ds_id: int, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  57. return crud.get_job_jdbc_datasources_info(db, ds_id)
  58. @router.put("/{ds_id}")
  59. @web_try()
  60. @sxtimeit
  61. def update_datasource(ds_id: int, ds: schemas.JobJdbcDatasourceUpdate, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
  62. return crud.update_job_jdbc_datasources(db, ds_id, ds)
  63. @router.delete("/{ds_id}")
  64. @web_try()
  65. @sxtimeit
  66. def delete_job_jdbc_datasource(ds_id: int, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
  67. return crud.delete_job_jdbc_datasource(db, ds_id)
  68. @router.post("/import_datalake")
  69. @web_try()
  70. @sxtimeit
  71. def import_datalake(item: schemas.ImportDataLake, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
  72. return crud.import_datalake(db, item)
  73. @router.put("/update_datalake/{dl_id}")
  74. @web_try()
  75. @sxtimeit
  76. def update_datalake(dl_id: int, item: schemas.ImportDataLake, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
  77. return crud.update_datalake(db, dl_id, item)
  78. @router.delete("/delete_datalake/{dl_id}")
  79. @web_try()
  80. @sxtimeit
  81. def delete_datalake(dl_id: int, token_data: schemas.TokenData = Depends(verify_super_admin), db: Session = Depends(get_db)):
  82. return crud.delete_datalake(db, dl_id)
  83. @router.post("/share_ailab")
  84. @web_try()
  85. @sxtimeit
  86. def share_ailab(item: schemas.ShareAilab, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  87. return crud.share_ailab(db, item)
  88. @router.post("/create_table")
  89. @web_try()
  90. @sxtimeit
  91. def create_table(item: schemas.CreateAilab, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  92. return crud.create_table(db, item)
  93. @router.get("/ailab_source")
  94. @web_try()
  95. @sxtimeit
  96. def get_ailab_source(token_data: schemas.TokenData = Depends(verify_users), ):
  97. return [{
  98. 'database_name': DATABASE_NAME,
  99. 'datasource': "hive",
  100. 'datasource_name': DATABASE_NAME,
  101. 'id': -1
  102. }]
  103. @router.get("/ailab_table")
  104. @web_try()
  105. @sxtimeit
  106. def get_ailab_table(token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  107. return crud.get_ailab_table(db, token_data.project_id)
  108. @router.get("/ailab_table_schema")
  109. @web_try()
  110. @sxtimeit
  111. def get_ailab_table_schema(table_name: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  112. return crud.get_ailab_table_schema(db, table_name)
  113. @router.get("/preview_ailab_table")
  114. @web_try()
  115. @sxtimeit
  116. def get_preview_ailab_table(table_name: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  117. return crud.get_preview_ailab_table(db, table_name)
  118. @router.get("/lake_table")
  119. @web_try()
  120. @sxtimeit
  121. def get_lake_table(token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  122. return crud.get_lake_table(db, token_data.project_id)
  123. @router.get("/lake_table_info")
  124. @web_try()
  125. @sxtimeit
  126. def get_lake_table_info(dl_id: int, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  127. return crud.get_lake_table_info(db, dl_id)
  128. @router.get("/lake_table_schema")
  129. @web_try()
  130. @sxtimeit
  131. def get_lake_table_schema(db_name: str, table_name: str, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  132. return crud.get_lake_table_schema(db, db_name, table_name)
  133. @router.get("/preview_lake_table")
  134. @web_try()
  135. @sxtimeit
  136. def get_preview_lake_table(db_name: str, table_name: str, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
  137. return crud.get_preview_lake_table(db, db_name, table_name)
  138. @router.get("/table_location")
  139. @web_try()
  140. @sxtimeit
  141. def get_table_location(db_name: str, table_name: str, ds_id: Optional[int] = None, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  142. def find_n_sub_str(src, sub, pos, start):
  143. index = src.find(sub, start)
  144. if index != -1 and pos > 0:
  145. return find_n_sub_str(src, sub, pos - 1, index + 1)
  146. return index
  147. res = None
  148. if ds_id is None:
  149. res = crud.get_table_info(db, db_name, table_name)
  150. else:
  151. res = crud.get_job_jdbc_datasource_table_location(
  152. db, db_name, table_name, ds_id)
  153. location = ''
  154. hdfs = ''
  155. for line_list in res[0]:
  156. if line_list[0].find('Location') >= 0:
  157. location = line_list[1]
  158. index = int(find_n_sub_str(location, '/', 2, 0))
  159. if index > 0:
  160. hdfs, location = location[0:index], location[index:]
  161. break
  162. return {'location': location, 'hdfs': hdfs}
  163. @router.post("/structuring_data")
  164. @web_try()
  165. @sxtimeit
  166. def get_image_version(table_name: str, auth_token: str = Header(), token_data: schemas.TokenData = Depends(verify_users)):
  167. current_time = int(time.time())
  168. name = f'{table_name}_{current_time}'
  169. table_schema = hiveDs.get_table_schema(table_name)
  170. if len(table_schema) == 0:
  171. raise Exception('数据表无列,无法导出')
  172. table_headers = [str(x).split(':')[1] for x in table_schema]
  173. dataTitleView = ''
  174. for header in table_headers:
  175. dataTitleView = dataTitleView + header + ','
  176. dataTitleView = dataTitleView[0:-1]
  177. request_data = {
  178. "name": name,
  179. "dsType": "all",
  180. "identifyField": table_headers[0],
  181. "dsClass": "1",
  182. "dataSaveCycle": "",
  183. "cronType": "",
  184. "cronTrigger": "",
  185. "reviewFlag": "0",
  186. "extractType": "0",
  187. "extractRows": "",
  188. "dataType": "db",
  189. "dsConf": {
  190. "type": "db",
  191. "dbId": STRUCTURING_DATA_DBID,
  192. "tableName": f'`{table_name}`',
  193. "cycleMask": "",
  194. "sqlSentence": ""
  195. },
  196. "metaDataTemplate": {
  197. "metaDataType": "1",
  198. "templatePath": "",
  199. "matchType": "1"
  200. },
  201. "dataTitleView": dataTitleView
  202. }
  203. res = send_util.post_structuring_data(request_data, auth_token)
  204. return res['data']
  205. add_pagination(router)