import os import stat from app.core.airflow.task import * from app.core.airflow.uri import get_job_path from app.schemas import AirflowJob class AirflowJobSubmitter: @staticmethod def submit_dag(item: AirflowJob): m_compilers = {'python': PythonTaskCompiler, 'datax': DataXTaskCompiler, 'sparks': SparksTaskCompiler, 'java': JavaTaskCompiler} nodes = [m_compilers[task.task_type](item=task).translate(job_id=item.id,task_mode=item.job_mode or 1) for task in item.tasks if task.task_type != 'sparks'] spark_nodes = [SparksTaskCompiler(item=task).translate(job_id=item.id,task_mode=item.job_mode or 1) for task in item.tasks if task.task_type == 'sparks'] edges = [] for edge in item.dependence: edges.append({"source_operator_name": f'op_{edge[0]}', "target_operator_name": f'op_{edge[1]}'}) # # m_interval = { # "None": "None", # "@once": "@once", # "0 * * * *": "@hourly", # "0 0 * * *": "@daily", # "0 0 * * 0": "@weekly", # "0 0 1 * *": "@monthly", # "0 0 1 1 *": "@yearly" # } parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}', 'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status), 'interval': item.cron if item.cron != 'None' else None } env = Environment( loader=PackageLoader('app.core.airflow'), autoescape=select_autoescape() ) template = env.get_template("dag_template.py.jinja2") dag_content = template.render(parameters) print(f'finish build:{dag_content}') output_path = get_job_path(job_id=item.id) with open(output_path, "w") as fh: fh.write(dag_content) os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU) print(f'write dag to {output_path}')