12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- import datetime
- import json
- import requests
- from app.common.minio import FileHandler
- from configs.settings import config
- def spark_result_tb_name(job_id, task_id, spark_node_id, out_pin, is_tmp=False):
- return f'{config.get("HIVE","database_name")}.`job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}`'
- def get_sub_task_script_uri(task_id, sub_node_id):
- return f'/xxx/task_{task_id}/sub_{sub_node_id}.py'
- def get_remote_file(uri, return_json=False):
- data = requests.get(uri).content
- return json.loads(data) if return_json else data
- def upload2oss(content: bytes, uri: str, minio_bucket: str):
- minio_handler = FileHandler(bucket_name=minio_bucket)
- minio_handler.put_byte_file(file_name=uri, file_content=content)
- def get_job_path(job_id):
- dag_path = f'{config.get("AF_BACKEND", "dag_files_dir")}'
- return dag_path + f'dag_{job_id}.py'
- def get_airflow_api_info():
- uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1'
- headers = {
- 'content-type': 'application/json',
- 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
- }
- host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None)
- if host_in_header not in ['', None]:
- headers['Host'] = host_in_header
- return uri_prefix, headers
- def call_airflow_api(method, uri, args_dict):
- uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1'
- headers = {
- 'content-type': 'application/json',
- 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}',
- }
- host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None)
- if host_in_header not in ['', None]:
- headers['Host'] = host_in_header
- if method == 'post':
- return requests.post(uri_prefix + '/' + uri, headers=headers, **args_dict)
- if method == 'get':
- print(f"enter get, uri is {uri_prefix + '/' + uri}")
- return requests.get(uri_prefix + '/' + uri, headers=headers, **args_dict)
- def datetime2timestamp(datetime_str):
- return datetime.datetime.strptime(datetime_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
|