1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- import datetime
- import json
- import requests
- from app.common.minio import FileHandler
- from configs.settings import config
- def spark_result_tb_name(job_id, task_id, spark_node_id, out_pin, is_tmp=False):
- return f'{config.get("HIVE", "database_name")}.job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}'
- def get_sub_task_script_uri(task_id, sub_node_id):
- return f'/xxx/task_{task_id}/sub_{sub_node_id}.py'
- def get_remote_file(uri, return_json=False):
- data = requests.get(uri).content
- return json.loads(data) if return_json else data
- def upload2oss(content: bytes, uri: str, minio_bucket: str):
- minio_handler = FileHandler(bucket_name=minio_bucket)
- minio_handler.put_byte_file(file_name=uri, file_content=content)
- def get_job_path(job_id):
- dag_path = f'{config.get("AF_BACKEND", "dag_files_dir")}'
- return dag_path + f'dag_{job_id}.py'
- def get_airflow_api_info():
- uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1'
- headers = {
- 'content-type': 'application/json',
- 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
- }
- host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None)
- if host_in_header not in ['', None]:
- headers['Host'] = host_in_header
- return uri_prefix, headers
- def call_airflow_api(method, uri, args_dict):
- uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1'
- headers = {
- 'content-type': 'application/json',
- 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
- }
- host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None)
- if host_in_header not in ['', None]:
- headers['Host'] = host_in_header
- if method == 'post':
- return requests.post(uri_prefix + '/' + uri, headers=headers, **args_dict)
- if method == 'get':
- print(f"enter get, uri is {uri_prefix + '/' + uri}")
- return requests.get(uri_prefix + '/' + uri, headers=headers, **args_dict)
- def datetime2timestamp(datetime_str):
- if datetime_str is None:
- return None
- ts = None
- try:
- ts = datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S%z').timestamp()
- except Exception:
- ts = datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
- finally:
- return ts
|