import json import requests from app.common.minio import FileHandler def spark_result_tb_name(task_id, spark_node_id, out_pin, is_tmp=False): return f'task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}' def get_sub_task_script_uri(task_id, sub_node_id): return f'/xxx/task_{task_id}/sub_{sub_node_id}.py' # def get_spark_sub_task_inputs(task_id, sub_node_id, ): # return f'/xxx/results/tmp/task_{task_id}/sub_{sub_node_id}/' # # # def get_spark_sub_task_outputs(task_id, sub_node_id, number): # return [f'/xxx/results/tmp/task_{task_id}/sub_{sub_node_id}/result_{i}' for i in range(number)] # def get_remote_file(uri, return_json=False): data = requests.get(uri).content return json.loads(data) if return_json else data def upload2oss(content: bytes, uri: str, minio_bucket: str): minio_handler = FileHandler(bucket_name=minio_bucket) minio_handler.put_byte_file(file_name=uri, file_content=content)