1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- from asyncio import current_task
- from re import A
- import time
- from typing import Optional
- from fastapi import APIRouter
- from fastapi import Depends
- from sqlalchemy.orm import Session
- from app import schemas
- import app.crud as crud
- from app.services.dag import get_tmp_table_name
- from app.utils.send_util import data_transfer_run
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from app.common.hive import hiveDs
- from app import get_db
- from configs.settings import DefaultOption, config
- database_name = config.get('HIVE', 'DATABASE_NAME')
- router = APIRouter(
- prefix="/jpt/datamanagement",
- tags=["datamanagement-数据管理"],
- )
- @router.post("/")
- @web_try()
- @sxtimeit
- def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)):
- table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
- tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
- af_run_id = data_transfer_run(tmp_table_name, table_name)
- res = crud.create_data_management(db, item, table_name)
- return res
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_data_managements(user_id: str, project_id: str, db: Session = Depends(get_db)):
- res = crud.get_data_managements(db, user_id, project_id)
- for item in res:
- item.table_name = f'{database_name}.{item.table_name}'
- return res
- @router.get("/local")
- @web_try()
- @sxtimeit
- def get_local_data_managements(db: Session = Depends(get_db)):
- t_list = hiveDs.list_tables()
- res = [f'{database_name}.{t}' for t in t_list]
- return res
- @router.get("/table_schema")
- @web_try()
- @sxtimeit
- def get_data_managements_schema(table_name: str, db: Session = Depends(get_db)):
- return hiveDs.get_table_schema(table_name)
- @router.delete("/")
- @web_try()
- @sxtimeit
- def delete_data_management(data_management_id: int, db: Session = Depends(get_db)):
- data_management = crud.delete_data_management(db, data_management_id)
- return data_management
- @router.get("/table_content")
- @web_try()
- @sxtimeit
- def get_data_management_content(table_name: str, page: Optional[int] = 0, size: Optional[int] = 100, db: Session = Depends(get_db)):
- result = hiveDs.get_preview_data(table_name,limit=size,page=page)
- data_num = hiveDs.get_data_num(table_name)
- result.update({'total':data_num})
- return result
|