data_management.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from asyncio import current_task
  2. from re import A, I
  3. import time
  4. from typing import Optional
  5. from fastapi import APIRouter
  6. from fastapi import Depends
  7. from sqlalchemy.orm import Session
  8. from app import schemas
  9. from app.common.security.auth import verify_users
  10. import app.crud as crud
  11. from app.services.dag import get_tmp_table_name
  12. from app.utils.send_util import data_transfer_run, get_data_transfer_run_status
  13. from constants.constants import RUN_STATUS
  14. from utils.sx_time import sxtimeit
  15. from utils.sx_web import web_try
  16. from app.common.hive import hiveDs
  17. from app import get_db
  18. from configs.settings import DefaultOption, config
  19. database_name = config.get('HIVE', 'DATABASE_NAME')
  20. router = APIRouter(
  21. prefix="/jpt/datamanagement",
  22. tags=["datamanagement-数据管理"],
  23. )
  24. @router.post("/")
  25. @web_try()
  26. @sxtimeit
  27. def create_data_management(item: schemas.DataManagementCreate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  28. current_time = int(time.time())
  29. table_name = f'project{token_data.project_id}_user{token_data.user_id}_{item.name}_{current_time}'
  30. table_name = table_name.lower()
  31. tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
  32. af_run_res = data_transfer_run(database_name+'.'+tmp_table_name, database_name+'.'+table_name)
  33. af_run = af_run_res['data'] if 'data' in af_run_res.keys() else None
  34. af_run_id = af_run['af_run_id'] if af_run and 'af_run_id' in af_run.keys() else None
  35. if af_run_id:
  36. item.name = item.name + '_' + str(current_time)
  37. res = crud.create_data_management(db, item, table_name, af_run_id)
  38. return res
  39. else:
  40. raise Exception('中间结果转存失败')
  41. @router.get("/")
  42. @web_try()
  43. @sxtimeit
  44. def get_data_managements(token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  45. res = crud.get_data_managements(db, token_data.user_id, token_data.project_id)
  46. data_management_list = []
  47. for item in res:
  48. item.table_name = f'{database_name}.{item.table_name}'
  49. data_management_list.append(item)
  50. return data_management_list
  51. @router.get("/info")
  52. @web_try()
  53. @sxtimeit
  54. def get_data_management_info(id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  55. item = crud.get_data_management_info(db, id)
  56. if item.status == 1:
  57. transfer_run_res = get_data_transfer_run_status(item.af_run_id)
  58. transfer_run = transfer_run_res['data'] if 'data' in transfer_run_res.keys() else None
  59. transfer_run_status = transfer_run['status'] if transfer_run and 'status' in transfer_run.keys() else None
  60. if transfer_run_status:
  61. item = crud.update_data_management_status(db, item.id, RUN_STATUS[transfer_run_status])
  62. location = ''
  63. owner = ''
  64. if item.status == 2:
  65. res = hiveDs.get_table_info(item.table_name)
  66. for line_list in res[0]:
  67. if line_list[0].find('Location')>=0:
  68. location = line_list[1]
  69. if line_list[0].find('Owner')>=0:
  70. owner = line_list[1]
  71. share_status = crud.check_share(db, item.table_name)
  72. item_dict = item.to_dict()
  73. item_dict.update({
  74. 'table_name': f'{database_name}.{item.table_name}',
  75. 'owner': owner,
  76. 'location': location,
  77. 'share_status': share_status
  78. })
  79. return item_dict
  80. @router.delete("/")
  81. @web_try()
  82. @sxtimeit
  83. def delete_data_management(data_management_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  84. data_management = crud.delete_data_management(db, data_management_id)
  85. return data_management
  86. @router.get("/table_content")
  87. @web_try()
  88. @sxtimeit
  89. def get_data_management_content(table_name: str, page: Optional[int] = 1, size: Optional[int] = 100, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  90. table_name = table_name.split('.')[-1]
  91. result = hiveDs.get_preview_data(table_name,size=size,start=(page-1)*size)
  92. data_num = hiveDs.get_data_num(table_name)
  93. result.update({'total':data_num})
  94. return result
  95. @router.get("/table_schema")
  96. @web_try()
  97. @sxtimeit
  98. def get_data_management_schema(table_name: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  99. table_name = table_name.split('.')[-1]
  100. result = hiveDs.get_table_schema(table_name)
  101. return result