import os import stat import time from app.core.airflow.task import * from app.core.airflow.af_util import get_job_path from app.schemas import AirflowJob class AirflowJobSubmitter: @staticmethod def submit_dag(item: AirflowJob): m_compilers = {'python': PythonTaskCompiler, 'datax': DataXTaskCompiler, 'sparks': SparksTaskCompiler, 'java': JavaTaskCompiler} nodes = [m_compilers[task.task_type](item=task).translate(job_id=item.id, task_mode=item.job_mode or 1) for task in item.tasks if task.task_type != 'sparks'] spark_nodes = [SparksTaskCompiler(item=task).translate(job_id=item.id, task_mode=item.job_mode or 1) for task in item.tasks if task.task_type == 'sparks'] edges = [] for edge in item.dependence: edges.append({"source_operator_name": f'op_{edge[0]}', "target_operator_name": f'op_{edge[1]}'}) # # m_interval = { # "None": "None", # "@once": "@once", # "0 * * * *": "@hourly", # "0 0 * * *": "@daily", # "0 0 * * 0": "@weekly", # "0 0 1 * *": "@monthly", # "0 0 1 1 *": "@yearly" # } # print(f" image pull key is : {config.get('K8S', 'image_pull_key')}") parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}', 'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status), 'interval': item.cron if item.cron != 'None' else None, 'af_backend_uri': config.get('AF_BACKEND', 'uri'), 'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None), 'enable_notify': True } AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=item.id)) @staticmethod def generate_dag_on_airflow(parameters, save_path): env = Environment( loader=PackageLoader('app.core.airflow'), autoescape=select_autoescape() ) template = env.get_template("dag_template.py.jinja2") dag_content = template.render(parameters) print(f'finish build:{dag_content}') with open(save_path, "w") as fh: fh.write(dag_content) os.chmod(save_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU) print(f'write dag to {save_path}') @staticmethod def auto_submit_data_transfer(): job_id = 0 user_id = 0 spark_task_demo = SparksTaskCompiler(item=None) spark_nodes = [ { "sub_nodes": [{ "name": 'read_and_save', "id": 0, "image": spark_task_demo.default_image, "cmds": ['/bin/bash', '-c', spark_task_demo.cmd_str(name='spark_data_transfer')], "env": {"SCRIPT": spark_task_demo.render_spark_script( parameters={"hive_metastore_uris": config.get('HIVE_METASTORE', 'uris')}, template_file="data_transfer_dag_template.py.jinja2") }, }], "edges": [], "name": 'data_save', "desc": 'task for data saving', "id": 0, } ] parameters = {'nodes': [], 'spark_nodes': spark_nodes, 'edges': [], 'dag_id': f'dag_{job_id}', 'user_name': user_id, 'job_id': job_id, 'trigger_status': False, 'interval': None, 'af_backend_uri': config.get('AF_BACKEND', 'uri'), 'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None), 'enable_notify': False } AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=job_id)) print('create data transfer job success!') @staticmethod def auto_submit_requirements_install(): job_id = -1 user_id = 0 cmds = [] if config.get('K8S', 'enable_kerberos', fallback=None) in ['true', "True", True]: principal = config.get('HIVE', 'principal', fallback=None) cmds.append( f"kinit -kt /workspace/conf/user.keytab {principal}") cmds.append("cd /workspace") cmds.append("./py37/bin/pip3 install {{ dag_run.conf.get('requirements_str','numpy') }} -i https://pypi.douban.com/simple/") cmds.append("zip -q -r /tmp/py37.zip py37") cmds.append("hdfs dfs -put -f /tmp/py37.zip {{dag_run.conf.get('target_path','/tmp/basic.zip')}} ") py_task_demo = PythonTaskCompiler(item=AirflowTask(id=0, task_type="python", user_id=user_id, cmd_parameters="", create_time=0, update_time=0, name="requirements_install", file_urls=[], script="", cmd=' && '.join(cmds), envs={}, run_image=SparksTaskCompiler(item=None).default_image)) parameters = {'nodes': [py_task_demo.translate(job_id=job_id, task_mode=1)], 'spark_nodes': [], 'edges': [], 'dag_id': f'dag_{job_id}', 'user_name': user_id, 'job_id': job_id, 'trigger_status': False, 'interval': None, 'af_backend_uri': config.get('AF_BACKEND', 'uri'), 'image_pull_key': config.get('K8S', 'image_pull_key', fallback=None), 'enable_notify': False } AirflowJobSubmitter.generate_dag_on_airflow(parameters=parameters, save_path=get_job_path(job_id=job_id)) print('create requirements_install success!')