12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- import os
- import stat
- from app.core.airflow.task import *
- from app.schemas import AirflowJob
- class AirflowJobSubmitter:
- @staticmethod
- def submit_dag(item: AirflowJob):
- m_compilers = {'python': PythonTaskCompiler,
- 'datax': DataXTaskCompiler,
- 'sparks': SparksTaskCompiler,
- 'java': JavaTaskCompiler}
- nodes = [m_compilers[task.task_type](item=task).translate(item.job_mode or 1) for task in item.tasks if
- task.task_type != 'sparks']
- spark_nodes = [SparksTaskCompiler(item=task).translate(item.job_mode or 1) for task in item.tasks if
- task.task_type == 'sparks']
- edges = []
- for edge in item.dependence:
- edges.append({"source_operator_name": f'op_{edge[0]}',
- "target_operator_name": f'op_{edge[1]}'})
- #
- # m_interval = {
- # "None": "None",
- # "@once": "@once",
- # "0 * * * *": "@hourly",
- # "0 0 * * *": "@daily",
- # "0 0 * * 0": "@weekly",
- # "0 0 1 * *": "@monthly",
- # "0 0 1 1 *": "@yearly"
- # }
- parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
- 'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
- 'interval': item.cron
- }
- env = Environment(
- loader=PackageLoader('app.core.airflow'),
- autoescape=select_autoescape()
- )
- template = env.get_template("dag_template.py.jinja2")
- dag_content = template.render(parameters)
- print(f'finish build:{dag_content}')
- dag_path = '/dags/'
- output_path = dag_path + f'dag_{item.id}.py'
- with open(output_path, "w") as fh:
- fh.write(dag_content)
- os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
- print(f'write dag to {output_path}')
|