af_util.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import datetime
  2. import json
  3. import requests
  4. from app.common.minio import FileHandler
  5. from configs.settings import config
  6. def spark_result_tb_name(job_id, task_id, spark_node_id, out_pin, is_tmp=False):
  7. return f'{config.get("HIVE", "database_name")}.job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
  8. def get_sub_task_script_uri(task_id, sub_node_id):
  9. return f'/xxx/task_{task_id}/sub_{sub_node_id}.py'
  10. def get_remote_file(uri, return_json=False):
  11. data = requests.get(uri).content
  12. return json.loads(data) if return_json else data
  13. def upload2oss(content: bytes, uri: str, minio_bucket: str):
  14. minio_handler = FileHandler(bucket_name=minio_bucket)
  15. minio_handler.put_byte_file(file_name=uri, file_content=content)
  16. def get_job_path(job_id):
  17. dag_path = f'{config.get("AF_BACKEND", "dag_files_dir")}'
  18. return dag_path + f'dag_{job_id}.py'
  19. def get_airflow_api_info():
  20. uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1'
  21. headers = {
  22. 'content-type': 'application/json',
  23. 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
  24. }
  25. host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None)
  26. if host_in_header not in ['', None]:
  27. headers['Host'] = host_in_header
  28. return uri_prefix, headers
  29. def call_airflow_api(method, uri, args_dict):
  30. uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1'
  31. headers = {
  32. 'content-type': 'application/json',
  33. 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
  34. }
  35. host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None)
  36. if host_in_header not in ['', None]:
  37. headers['Host'] = host_in_header
  38. if method == 'post':
  39. return requests.post(uri_prefix + '/' + uri, headers=headers, **args_dict)
  40. if method == 'get':
  41. print(f"enter get, uri is {uri_prefix + '/' + uri}")
  42. return requests.get(uri_prefix + '/' + uri, headers=headers, **args_dict)
  43. def datetime2timestamp(datetime_str):
  44. if datetime_str is None:
  45. return None
  46. ts = None
  47. try:
  48. ts = datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S%z').timestamp()
  49. except Exception:
  50. ts = datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
  51. finally:
  52. return ts