data_management.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from asyncio import current_task
  2. from re import A
  3. import time
  4. from typing import Optional
  5. from fastapi import APIRouter
  6. from fastapi import Depends
  7. from sqlalchemy.orm import Session
  8. from app import schemas
  9. import app.crud as crud
  10. from app.services.dag import get_tmp_table_name
  11. from app.utils.send_util import data_transfer_run
  12. from utils.sx_time import sxtimeit
  13. from utils.sx_web import web_try
  14. from app.common.hive import hiveDs
  15. from app import get_db
  16. from configs.settings import DefaultOption, config
  17. database_name = config.get('HIVE', 'DATABASE_NAME')
  18. router = APIRouter(
  19. prefix="/jpt/datamanagement",
  20. tags=["datamanagement-数据管理"],
  21. )
  22. @router.post("/")
  23. @web_try()
  24. @sxtimeit
  25. def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)):
  26. current_time = int(time.time())
  27. table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
  28. tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
  29. af_run_id = data_transfer_run(tmp_table_name, table_name)
  30. res = crud.create_data_management(db, item, table_name)
  31. return res
  32. @router.get("/")
  33. @web_try()
  34. @sxtimeit
  35. def get_data_managements(user_id: str, project_id: str, db: Session = Depends(get_db)):
  36. res = crud.get_data_managements(db, user_id, project_id)
  37. for item in res:
  38. item.table_name = f'{database_name}.{item.table_name}'
  39. return res
  40. @router.get("/local")
  41. @web_try()
  42. @sxtimeit
  43. def get_local_data_managements(db: Session = Depends(get_db)):
  44. t_list = hiveDs.list_tables()
  45. res = [f'{database_name}.{t}' for t in t_list]
  46. return res
  47. @router.get("/table_schema")
  48. @web_try()
  49. @sxtimeit
  50. def get_data_managements_schema(table_name: str, db: Session = Depends(get_db)):
  51. table_name = table_name.split('.')[-1]
  52. return hiveDs.get_table_schema(table_name)
  53. @router.delete("/")
  54. @web_try()
  55. @sxtimeit
  56. def delete_data_management(data_management_id: int, db: Session = Depends(get_db)):
  57. data_management = crud.delete_data_management(db, data_management_id)
  58. return data_management
  59. @router.get("/table_content")
  60. @web_try()
  61. @sxtimeit
  62. def get_data_management_content(table_name: str, page: Optional[int] = 0, size: Optional[int] = 100, db: Session = Depends(get_db)):
  63. table_name = table_name.split('.')[-1]
  64. result = hiveDs.get_preview_data(table_name,limit=size,page=page)
  65. data_num = hiveDs.get_data_num(table_name)
  66. result.update({'total':data_num})
  67. return result