from asyncio import current_task from re import A, I import time from typing import Optional from fastapi import APIRouter from fastapi import Depends from sqlalchemy.orm import Session from app import schemas from app.common.decorators import verify_all import app.crud as crud from app.services.dag import get_tmp_table_name from app.utils.send_util import data_transfer_run, get_data_transfer_run_status from constants.constants import RUN_STATUS from utils.sx_time import sxtimeit from utils.sx_web import web_try from app.common.hive import hiveDs from configs.globals import g from app import get_db from configs.settings import DefaultOption, config database_name = config.get('HIVE', 'DATABASE_NAME') router = APIRouter( prefix="/jpt/datamanagement", tags=["datamanagement-数据管理"], ) @router.post("/", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)): current_time = int(time.time()) table_name = f'project{g.project_id}_user{g.user_id}_{item.name}_{current_time}' table_name = table_name.lower() tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db) af_run_res = data_transfer_run(database_name+'.'+tmp_table_name, database_name+'.'+table_name) af_run = af_run_res['data'] if 'data' in af_run_res.keys() else None af_run_id = af_run['af_run_id'] if af_run and 'af_run_id' in af_run.keys() else None if af_run_id: item.name = item.name + '_' + str(current_time) res = crud.create_data_management(db, item, table_name, af_run_id) return res else: raise Exception('中间结果转存失败') @router.get("/", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def get_data_managements(db: Session = Depends(get_db)): res = crud.get_data_managements(db, g.user_id, g.project_id) data_management_list = [] for item in res: item.table_name = f'{database_name}.{item.table_name}' data_management_list.append(item) return data_management_list @router.get("/info", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def get_data_management_info(id: int, db: Session = Depends(get_db)): item = crud.get_data_management_info(db, id) if item.status == 1: transfer_run_res = get_data_transfer_run_status(item.af_run_id) transfer_run = transfer_run_res['data'] if 'data' in transfer_run_res.keys() else None transfer_run_status = transfer_run['status'] if transfer_run and 'status' in transfer_run.keys() else None if transfer_run_status: item = crud.update_data_management_status(db, item.id, RUN_STATUS[transfer_run_status]) item.table_name = f'{database_name}.{item.table_name}' return item @router.delete("/", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def delete_data_management(data_management_id: int, db: Session = Depends(get_db)): data_management = crud.delete_data_management(db, data_management_id) return data_management @router.get("/table_content", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def get_data_management_content(table_name: str, page: Optional[int] = 1, size: Optional[int] = 100, db: Session = Depends(get_db)): table_name = table_name.split('.')[-1] result = hiveDs.get_preview_data(table_name,size=size,start=(page-1)*size) data_num = hiveDs.get_data_num(table_name) result.update({'total':data_num}) return result @router.get("/table_schema", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def get_data_management_schema(table_name: str, db: Session = Depends(get_db)): table_name = table_name.split('.')[-1] result = hiveDs.get_table_schema(table_name) return result