import datetime import json import requests from app.common.minio import FileHandler from configs.settings import config def spark_result_tb_name(job_id, task_id, spark_node_id, out_pin, is_tmp=False): return f'{config.get("HIVE", "database_name")}.job{job_id}_task{task_id}_subnode{spark_node_id}_output{out_pin}{"_tmp" if is_tmp else ""}' def get_sub_task_script_uri(task_id, sub_node_id): return f'/xxx/task_{task_id}/sub_{sub_node_id}.py' def get_remote_file(uri, return_json=False): data = requests.get(uri).content return json.loads(data) if return_json else data def upload2oss(content: bytes, uri: str, minio_bucket: str): minio_handler = FileHandler(bucket_name=minio_bucket) minio_handler.put_byte_file(file_name=uri, file_content=content) def get_job_path(job_id): dag_path = f'{config.get("AF_BACKEND", "dag_files_dir")}' return dag_path + f'dag_{job_id}.py' def get_airflow_api_info(): uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1' headers = { 'content-type': 'application/json', 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}', } host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None) if host_in_header not in ['', None]: headers['Host'] = host_in_header return uri_prefix, headers def call_airflow_api(method, uri, args_dict): uri_prefix = f'http://{config.get("AIRFLOW", "uri")}/api/v1' headers = { 'content-type': 'application/json', 'Authorization': f'basic {config.get("AIRFLOW", "api_token")}', } host_in_header = config.get("AIRFLOW", "host_in_header", fallback=None) if host_in_header not in ['', None]: headers['Host'] = host_in_header if method == 'post': return requests.post(uri_prefix + '/' + uri, headers=headers, **args_dict) if method == 'get': print(f"enter get, uri is {uri_prefix + '/' + uri}") return requests.get(uri_prefix + '/' + uri, headers=headers, **args_dict) def datetime2timestamp(datetime_str): if datetime_str is None: return None ts = None try: ts = datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S%z').timestamp() except Exception: ts = datetime.datetime.strptime(datetime_str,'%Y-%m-%dT%H:%M:%S.%f%z').timestamp() finally: return ts