123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- from asyncio import current_task
- from re import A, I
- import time
- from typing import Optional
- from fastapi import APIRouter
- from fastapi import Depends
- from sqlalchemy.orm import Session
- from app import schemas
- from app.common.security.auth import verify_users
- import app.crud as crud
- from app.services.dag import get_tmp_table_name
- from app.utils.send_util import data_transfer_run, get_data_transfer_run_status
- from constants.constants import RUN_STATUS
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from app.common.hive import hiveDs
- from app import get_db
- from configs.settings import DefaultOption, config
- database_name = config.get('HIVE', 'DATABASE_NAME')
- router = APIRouter(
- prefix="/jpt/datamanagement",
- tags=["datamanagement-数据管理"],
- )
- @router.post("/")
- @web_try()
- @sxtimeit
- def create_data_management(item: schemas.DataManagementCreate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- current_time = int(time.time())
- table_name = f'project{token_data.project_id}_user{token_data.user_id}_{item.name}_{current_time}'
- table_name = table_name.lower()
- tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
- af_run_res = data_transfer_run(database_name+'.'+tmp_table_name, database_name+'.'+table_name)
- af_run = af_run_res['data'] if 'data' in af_run_res.keys() else None
- af_run_id = af_run['af_run_id'] if af_run and 'af_run_id' in af_run.keys() else None
- if af_run_id:
- item.name = item.name + '_' + str(current_time)
- res = crud.create_data_management(db, item, table_name, af_run_id)
- return res
- else:
- raise Exception('中间结果转存失败')
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_data_managements(token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- res = crud.get_data_managements(db, token_data.user_id, token_data.project_id)
- data_management_list = []
- for item in res:
- item.table_name = f'{database_name}.{item.table_name}'
- data_management_list.append(item)
- return data_management_list
- @router.get("/info")
- @web_try()
- @sxtimeit
- def get_data_management_info(id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- item = crud.get_data_management_info(db, id)
- if item.status == 1:
- transfer_run_res = get_data_transfer_run_status(item.af_run_id)
- transfer_run = transfer_run_res['data'] if 'data' in transfer_run_res.keys() else None
- transfer_run_status = transfer_run['status'] if transfer_run and 'status' in transfer_run.keys() else None
- if transfer_run_status:
- item = crud.update_data_management_status(db, item.id, RUN_STATUS[transfer_run_status])
- location = ''
- owner = ''
- if item.status == 2:
- res = hiveDs.get_table_info(item.table_name)
- for line_list in res[0]:
- if line_list[0].find('Location')>=0:
- location = line_list[1]
- if line_list[0].find('Owner')>=0:
- owner = line_list[1]
- share_status = crud.check_share(db, item.table_name)
- item_dict = item.to_dict()
- item_dict.update({
- 'table_name': f'{database_name}.{item.table_name}',
- 'owner': owner,
- 'location': location,
- 'share_status': share_status
- })
- return item_dict
- @router.delete("/")
- @web_try()
- @sxtimeit
- def delete_data_management(data_management_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- data_management = crud.delete_data_management(db, data_management_id)
- return data_management
- @router.get("/table_content")
- @web_try()
- @sxtimeit
- def get_data_management_content(table_name: str, page: Optional[int] = 1, size: Optional[int] = 100, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- table_name = table_name.split('.')[-1]
- result = hiveDs.get_preview_data(table_name,size=size,start=(page-1)*size)
- data_num = hiveDs.get_data_num(table_name)
- result.update({'total':data_num})
- return result
- @router.get("/table_schema")
- @web_try()
- @sxtimeit
- def get_data_management_schema(table_name: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- table_name = table_name.split('.')[-1]
- result = hiveDs.get_table_schema(table_name)
- return result
|