data_management.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from asyncio import current_task
  2. from re import A, I
  3. import time
  4. from typing import Optional
  5. from fastapi import APIRouter
  6. from fastapi import Depends
  7. from sqlalchemy.orm import Session
  8. from app import schemas
  9. import app.crud as crud
  10. from app.services.dag import get_tmp_table_name
  11. from app.utils.send_util import data_transfer_run, get_data_transfer_run_status
  12. from constants.constants import RUN_STATUS
  13. from utils.sx_time import sxtimeit
  14. from utils.sx_web import web_try
  15. from app.common.hive import hiveDs
  16. from app import get_db
  17. from configs.settings import DefaultOption, config
  18. database_name = config.get('HIVE', 'DATABASE_NAME')
  19. router = APIRouter(
  20. prefix="/jpt/datamanagement",
  21. tags=["datamanagement-数据管理"],
  22. )
  23. @router.post("/")
  24. @web_try()
  25. @sxtimeit
  26. def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)):
  27. current_time = int(time.time())
  28. table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
  29. tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
  30. af_run_res = data_transfer_run(database_name+'.'+tmp_table_name, database_name+'.'+table_name)
  31. af_run = af_run_res['data'] if 'data' in af_run_res.keys() else None
  32. af_run_id = af_run['af_run_id'] if af_run and 'af_run_id' in af_run.keys() else None
  33. if af_run_id:
  34. item.name = item.name + '_' + str(current_time)
  35. res = crud.create_data_management(db, item, table_name, af_run_id)
  36. return res
  37. else:
  38. raise Exception('中间结果转存失败')
  39. @router.get("/")
  40. @web_try()
  41. @sxtimeit
  42. def get_data_managements(user_id: str, project_id: str, db: Session = Depends(get_db)):
  43. res = crud.get_data_managements(db, user_id, project_id)
  44. data_management_list = []
  45. for item in res:
  46. item.table_name = f'{database_name}.{item.table_name}'
  47. data_management_list.append(item)
  48. return data_management_list
  49. @router.get("/info")
  50. @web_try()
  51. @sxtimeit
  52. def get_data_management_info(id: int, db: Session = Depends(get_db)):
  53. item = crud.get_data_management_info(db, id)
  54. if item.status == 1:
  55. transfer_run_res = get_data_transfer_run_status(item.af_run_id)
  56. transfer_run = transfer_run_res['data'] if 'data' in transfer_run_res.keys() else None
  57. transfer_run_status = transfer_run['status'] if transfer_run and 'status' in transfer_run.keys() else None
  58. if transfer_run_status:
  59. item = crud.update_data_management_status(db, item.id, RUN_STATUS[transfer_run_status])
  60. item.table_name = f'{database_name}.{item.table_name}'
  61. return item
  62. @router.get("/local")
  63. @web_try()
  64. @sxtimeit
  65. def get_local_data_managements(db: Session = Depends(get_db)):
  66. t_list = hiveDs.list_tables()
  67. res = [f'{database_name}.{t}' for t in t_list]
  68. return res
  69. @router.get("/table_schema")
  70. @web_try()
  71. @sxtimeit
  72. def get_data_managements_schema(table_name: str, db: Session = Depends(get_db)):
  73. table_name = table_name.split('.')[-1]
  74. return hiveDs.get_table_schema(table_name)
  75. @router.delete("/")
  76. @web_try()
  77. @sxtimeit
  78. def delete_data_management(data_management_id: int, db: Session = Depends(get_db)):
  79. data_management = crud.delete_data_management(db, data_management_id)
  80. return data_management
  81. @router.get("/table_content")
  82. @web_try()
  83. @sxtimeit
  84. def get_data_management_content(table_name: str, page: Optional[int] = 0, size: Optional[int] = 100, db: Session = Depends(get_db)):
  85. table_name = table_name.split('.')[-1]
  86. result = hiveDs.get_preview_data(table_name,limit=size,page=page)
  87. data_num = hiveDs.get_data_num(table_name)
  88. result.update({'total':data_num})
  89. return result