job_jdbc_datasource.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import re
  2. from typing import Optional
  3. from fastapi import APIRouter
  4. from fastapi import Depends
  5. from sqlalchemy.orm import Session
  6. from app import schemas
  7. from app.common.decorators import verify_admin, verify_all, verify_special, verify_super_admin
  8. import app.crud as crud
  9. from utils.sx_time import sxtimeit
  10. from utils.sx_web import web_try
  11. from fastapi_pagination import Page, add_pagination, paginate, Params
  12. from app import get_db
  13. from configs.globals import g
  14. from configs.settings import DefaultOption, config
  15. DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
  16. router = APIRouter(
  17. prefix="/jpt/datasource",
  18. tags=["datasource-数据源管理"],
  19. )
  20. @router.post("/test", dependencies=[Depends(verify_super_admin)])
  21. @web_try()
  22. @sxtimeit
  23. def test_datasource_connection(ds: schemas.JobJdbcDatasourceCreate, db: Session = Depends(get_db)):
  24. return crud.test_datasource_connection(db, ds)
  25. @router.post("/preview", dependencies=[Depends(verify_special)])
  26. @web_try()
  27. @sxtimeit
  28. def get_preview_data(ds_id: int, table_name: str, limit: int = 100, db: Session = Depends(get_db)):
  29. return crud.get_preview_data(db, ds_id, table_name, limit)
  30. @router.post("/table_names", description="获取所有表名", dependencies=[Depends(verify_special)])
  31. @web_try()
  32. @sxtimeit
  33. def get_table_names(ds_id: int, db: Session = Depends(get_db)):
  34. return crud.get_table_names(db, ds_id)
  35. @router.post("/table_schema", description="获取表结构信息", dependencies=[Depends(verify_special)])
  36. @web_try()
  37. @sxtimeit
  38. def get_table_schema(ds_id: int, table_name: str, db: Session = Depends(get_db)):
  39. return crud.get_table_schema(db, ds_id, table_name)
  40. @router.post("/", dependencies=[Depends(verify_super_admin)])
  41. @web_try()
  42. @sxtimeit
  43. def create_datasource(ds: schemas.JobJdbcDatasourceCreate, db: Session = Depends(get_db)):
  44. return crud.create_job_jdbc_datasource(db, ds)
  45. @router.get("/", dependencies=[Depends(verify_special)])
  46. @web_try()
  47. @sxtimeit
  48. def get_datasources(datasource_type: Optional[str] = None, params: Params=Depends(), db: Session = Depends(get_db)):
  49. return paginate(crud.get_job_jdbc_datasources(db, datasource_type), params)
  50. @router.get("/info", dependencies=[Depends(verify_special)])
  51. @web_try()
  52. @sxtimeit
  53. def get_datasources_info(ds_id: int, db: Session = Depends(get_db)):
  54. return crud.get_job_jdbc_datasources_info(db, ds_id)
  55. @router.put("/{ds_id}", dependencies=[Depends(verify_super_admin)])
  56. @web_try()
  57. @sxtimeit
  58. def update_datasource(ds_id: int, ds: schemas.JobJdbcDatasourceUpdate, db: Session = Depends(get_db)):
  59. return crud.update_job_jdbc_datasources(db, ds_id, ds)
  60. @router.delete("/{ds_id}", dependencies=[Depends(verify_super_admin)])
  61. @web_try()
  62. @sxtimeit
  63. def delete_job_jdbc_datasource(ds_id: int, db: Session = Depends(get_db)):
  64. return crud.delete_job_jdbc_datasource(db, ds_id)
  65. @router.post("/import_datalake", dependencies=[Depends(verify_super_admin)])
  66. @web_try()
  67. @sxtimeit
  68. def import_datalake(item: schemas.ImportDataLake, db: Session = Depends(get_db)):
  69. return crud.import_datalake(db, item)
  70. @router.put("/update_datalake/{dl_id}", dependencies=[Depends(verify_super_admin)])
  71. @web_try()
  72. @sxtimeit
  73. def update_datalake(dl_id: int,item: schemas.ImportDataLake, db: Session = Depends(get_db)):
  74. return crud.update_datalake(db, dl_id, item)
  75. @router.delete("/delete_datalake/{dl_id}", dependencies=[Depends(verify_super_admin)])
  76. @web_try()
  77. @sxtimeit
  78. def delete_datalake(dl_id: int, db: Session = Depends(get_db)):
  79. return crud.delete_datalake(db, dl_id)
  80. @router.post("/share_ailab", dependencies=[Depends(verify_special)])
  81. @web_try()
  82. @sxtimeit
  83. def share_ailab(item: schemas.ShareAilab, db: Session = Depends(get_db)):
  84. return crud.share_ailab(db, item)
  85. @router.post("/create_table", dependencies=[Depends(verify_all)])
  86. @web_try()
  87. @sxtimeit
  88. def create_table(item: schemas.CreateAilab, db: Session = Depends(get_db)):
  89. return crud.create_table(db, item)
  90. @router.get("/ailab_source", dependencies=[Depends(verify_all)])
  91. @web_try()
  92. @sxtimeit
  93. def get_ailab_source():
  94. return [{
  95. 'database_name': DATABASE_NAME,
  96. 'datasource': "hive",
  97. 'datasource_name': DATABASE_NAME,
  98. 'id': -1
  99. }]
  100. @router.get("/ailab_table", dependencies=[Depends(verify_all)])
  101. @web_try()
  102. @sxtimeit
  103. def get_ailab_table(db: Session = Depends(get_db)):
  104. return crud.get_ailab_table(db, g.project_id)
  105. @router.get("/ailab_table_schema", dependencies=[Depends(verify_all)])
  106. @web_try()
  107. @sxtimeit
  108. def get_ailab_table_schema(table_name: str, db: Session = Depends(get_db)):
  109. return crud.get_ailab_table_schema(db, table_name)
  110. @router.get("/preview_ailab_table", dependencies=[Depends(verify_all)])
  111. @web_try()
  112. @sxtimeit
  113. def get_preview_ailab_table(table_name: str, db: Session = Depends(get_db)):
  114. return crud.get_preview_ailab_table(db, table_name)
  115. @router.get("/lake_table", dependencies=[Depends(verify_special)])
  116. @web_try()
  117. @sxtimeit
  118. def get_lake_table(db: Session = Depends(get_db)):
  119. return crud.get_lake_table(db, g.project_id)
  120. @router.get("/lake_table_info", dependencies=[Depends(verify_special)])
  121. @web_try()
  122. @sxtimeit
  123. def get_lake_table_info(dl_id: int, db: Session = Depends(get_db)):
  124. return crud.get_lake_table_info(db, dl_id)
  125. @router.get("/lake_table_schema", dependencies=[Depends(verify_special)])
  126. @web_try()
  127. @sxtimeit
  128. def get_lake_table_schema(db_name: str, table_name: str, db: Session = Depends(get_db)):
  129. return crud.get_lake_table_schema(db, db_name, table_name)
  130. @router.get("/preview_lake_table", dependencies=[Depends(verify_special)])
  131. @web_try()
  132. @sxtimeit
  133. def get_preview_lake_table(db_name: str, table_name: str, db: Session = Depends(get_db)):
  134. return crud.get_preview_lake_table(db, db_name, table_name)
  135. @router.get("/table_location", dependencies=[Depends(verify_all)])
  136. @web_try()
  137. @sxtimeit
  138. def get_table_location(db_name: str, table_name: str, ds_id: Optional[int] = None, db: Session = Depends(get_db)):
  139. def find_n_sub_str(src, sub, pos, start):
  140. index = src.find(sub, start)
  141. if index != -1 and pos > 0:
  142. return find_n_sub_str(src, sub, pos - 1, index + 1)
  143. return index
  144. res = None
  145. if ds_id is None:
  146. res = crud.get_table_info(db, db_name, table_name)
  147. else:
  148. res = crud.get_job_jdbc_datasource_table_location(db, db_name, table_name, ds_id)
  149. location = ''
  150. hdfs = ''
  151. for line_list in res[0]:
  152. if line_list[0].find('Location')>=0:
  153. location = line_list[1]
  154. index = int(find_n_sub_str(location, '/', 2, 0))
  155. if index > 0:
  156. hdfs , location = location[0:index] , location[index:]
  157. break
  158. return {'location': location, 'hdfs': hdfs}
  159. add_pagination(router)