123456789101112131415161718192021222324252627282930313233 |
- from jinja2 import Environment, PackageLoader, select_autoescape
- from app.core.airflow.task import *
- from app.schemas import AirflowJob
- class AirflowJobSubmitter:
- @staticmethod
- def submit_dag(item: AirflowJob):
- m_compilers = {'python': PythonTaskCompiler, 'datax': DataXTaskCompiler, 'sparks': SparksTaskCompiler}
- nodes = [m_compilers[task.task_type](item=task).translate() for task in item.tasks if
- task.task_type != 'sparks']
- spark_nodes = [SparksTaskCompiler(item=task).translate() for task in item.tasks if task.task_type == 'sparks']
- edges = []
- for edge in item.dependence:
- edges.append({"source_operator_name": f'op_{edge[0]}',
- "target_operator_name": f'op_{edge[1]}'})
- parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': item.name,
- 'user_name': item.user_id, 'job_id': item.id}
- env = Environment(
- loader=PackageLoader('app.core.airflow'),
- autoescape=select_autoescape()
- )
- template = env.get_template("dag_template.py.jinja2")
- dag_content = template.render(parameters)
- print(f'finish build:{dag_content}')
- dag_path = '/dags/'
- output_path = dag_path + f'{item.name}_{item.id}.py'
- with open(output_path, "w") as fh:
- fh.write(dag_content)
- print(f'write dag to {output_path}')
|