12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- from importlib.resources import contents
- import json
- import os
- from fastapi import APIRouter
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi.responses import StreamingResponse
- router = APIRouter(
- prefix="/jpt/dag",
- tags=["dag-dag管理"],
- )
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_dags(project_id: str, user_id: str):
- path = "./dag"
- file_list = get_all_dags(path)
- return file_list
- def get_all_dags(path):
- file_list = []
- files = os.listdir(path)
- for file in files:
- if file == ".DS_Store":
- continue
- next_path = path + '/' + file
- if os.path.isdir(next_path):
- n_file_list = get_all_dags(next_path)
- file_list.extend(n_file_list)
- else:
- if file[-4:] == ".dag":
- file_list.append(next_path.replace('./dag', ''))
- return file_list
- @router.get("/info")
- # @web_try()
- @sxtimeit
- def get_file(uri: str):
- response = StreamingResponse(get_file_byte('./dag' + uri))
- return response
- def get_file_byte(filename, chunk_size=1024):
- with open(filename, "rb") as f:
- while True:
- content = f.read(chunk_size)
- if content:
- yield content
- else:
- break
- @router.post("/execute")
- @web_try()
- @sxtimeit
- def execute_dag(dag_script: str):
- print(dag_script)
- return ""
|