12345678910111213141516171819202122232425262728293031 |
- import json
- import requests
- from app.common.minio import FileHandler
- def spark_result_tb_name(task_id, spark_node_id, out_pin, is_tmp=False):
- return f'task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
- def get_sub_task_script_uri(task_id, sub_node_id):
- return f'/xxx/task_{task_id}/sub_{sub_node_id}.py'
- # def get_spark_sub_task_inputs(task_id, sub_node_id, ):
- # return f'/xxx/results/tmp/task_{task_id}/sub_{sub_node_id}/'
- #
- #
- # def get_spark_sub_task_outputs(task_id, sub_node_id, number):
- # return [f'/xxx/results/tmp/task_{task_id}/sub_{sub_node_id}/result_{i}' for i in range(number)]
- #
- def get_remote_file(uri, return_json=False):
- data = requests.get(uri).content
- return json.loads(data) if return_json else data
- def upload2oss(content: bytes, uri: str, minio_bucket: str):
- minio_handler = FileHandler(bucket_name=minio_bucket)
- minio_handler.put_byte_file(file_name=uri, file_content=content)
|