from asyncio import current_task from re import A import time from typing import Optional from fastapi import APIRouter from fastapi import Depends from sqlalchemy.orm import Session from app import schemas import app.crud as crud from app.services.dag import get_tmp_table_name from app.utils.send_util import data_transfer_run from utils.sx_time import sxtimeit from utils.sx_web import web_try from app.common.hive import hiveDs from app import get_db from configs.settings import DefaultOption, config database_name = config.get('HIVE', 'DATABASE_NAME') router = APIRouter( prefix="/jpt/datamanagement", tags=["datamanagement-数据管理"], ) @router.post("/") @web_try() @sxtimeit def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)): table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}' tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db) af_run_id = data_transfer_run(tmp_table_name, table_name) res = crud.create_data_management(db, item, table_name) return res @router.get("/") @web_try() @sxtimeit def get_data_managements(user_id: str, project_id: str, db: Session = Depends(get_db)): res = crud.get_data_managements(db, user_id, project_id) for item in res: item.table_name = f'{database_name}.{item.table_name}' return res @router.get("/local") @web_try() @sxtimeit def get_local_data_managements(db: Session = Depends(get_db)): t_list = hiveDs.list_tables() res = [f'{database_name}.{t}' for t in t_list] return res @router.get("/table_schema") @web_try() @sxtimeit def get_data_managements_schema(table_name: str, db: Session = Depends(get_db)): return hiveDs.get_table_schema(table_name) @router.delete("/") @web_try() @sxtimeit def delete_data_management(data_management_id: int, db: Session = Depends(get_db)): data_management = crud.delete_data_management(db, data_management_id) return data_management @router.get("/table_content") @web_try() @sxtimeit def get_data_management_content(table_name: str, page: Optional[int] = 0, size: Optional[int] = 100, db: Session = Depends(get_db)): result = hiveDs.get_preview_data(table_name,limit=size,page=page) data_num = hiveDs.get_data_num(table_name) result.update({'total':data_num}) return result