uri.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import json
  2. import requests
  3. from app.common.minio import FileHandler
  4. from configs.settings import config
  5. def spark_result_tb_name(job_id, task_id, spark_node_id, out_pin, is_tmp=False):
  6. return f'job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
  7. def get_sub_task_script_uri(task_id, sub_node_id):
  8. return f'/xxx/task_{task_id}/sub_{sub_node_id}.py'
  9. def get_remote_file(uri, return_json=False):
  10. data = requests.get(uri).content
  11. return json.loads(data) if return_json else data
  12. def upload2oss(content: bytes, uri: str, minio_bucket: str):
  13. minio_handler = FileHandler(bucket_name=minio_bucket)
  14. minio_handler.put_byte_file(file_name=uri, file_content=content)
  15. def get_job_path(job_id):
  16. dag_path = f'{config.get("AIRFLOW", "dag_files_dir")}'
  17. return dag_path + f'dag_{job_id}.py'
  18. def get_airflow_api_info():
  19. uri_prefix = f'http://{config.get("AIRFLOW", "ip_address")}/api/v1'
  20. headers = {
  21. 'content-type': 'application/json',
  22. 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
  23. 'Host': f'{config.get("AIRFLOW", "host_in_header")}'
  24. }
  25. return uri_prefix, headers
  26. def call_airflow_api(method, uri, args_dict):
  27. uri_prefix = f'http://{config.get("AIRFLOW", "ip_address")}/api/v1'
  28. headers = {
  29. 'content-type': 'application/json',
  30. 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
  31. 'Host': f'{config.get("AIRFLOW", "host_in_header")}'
  32. }
  33. if method == 'post':
  34. print('enter post')
  35. return requests.post(uri_prefix + '/' + uri, headers=headers, **args_dict)