import io from multiprocessing.connection import wait import os from fastapi import APIRouter from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi.responses import StreamingResponse router = APIRouter( prefix="/jpt/dag_file", tags=["dag-dag管理"], ) @router.get("/") @web_try() @sxtimeit def get_dags(project_id: str, user_id: str): path = f"./{user_id}/dag" file_list = get_all_dags(path) return file_list def get_all_dags(path): file_list = [] files = os.listdir(path) for file in files: if file == ".DS_Store": continue next_path = path + '/' + file if os.path.isdir(next_path): n_file_list = get_all_dags(next_path) file_list.extend(n_file_list) else: if file[-4:] == ".dag": file_list.append(next_path.replace('./dag', '')) return file_list @router.get("/info") @sxtimeit def get_file(uri: str): response = StreamingResponse(get_file_byte('./dag' + uri)) return response def get_file_byte(filename, chunk_size=1024): with open(filename, "rb") as f: while True: content = f.read(chunk_size) if content: yield content else: break