job.py 1.4 KB

123456789101112131415161718192021222324252627282930313233
  1. from jinja2 import Environment, PackageLoader, select_autoescape
  2. from app.core.airflow.task import *
  3. from app.schemas import AirflowJob
  4. class AirflowJobSubmitter:
  5. @staticmethod
  6. def submit_dag(item: AirflowJob):
  7. m_compilers = {'python': PythonTaskCompiler, 'datax': DataXTaskCompiler, 'sparks': SparksTaskCompiler}
  8. nodes = [m_compilers[task.task_type](item=task).translate() for task in item.tasks if
  9. task.task_type != 'sparks']
  10. spark_nodes = [SparksTaskCompiler(item=task).translate() for task in item.tasks if task.task_type == 'sparks']
  11. edges = []
  12. for edge in item.dependence:
  13. edges.append({"source_operator_name": f'op_{edge[0]}',
  14. "target_operator_name": f'op_{edge[1]}'})
  15. parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': item.name,
  16. 'user_name': item.user_id, 'job_id': item.id}
  17. env = Environment(
  18. loader=PackageLoader('app.core.airflow'),
  19. autoescape=select_autoescape()
  20. )
  21. template = env.get_template("dag_template.py.jinja2")
  22. dag_content = template.render(parameters)
  23. print(f'finish build:{dag_content}')
  24. dag_path = '/dags/'
  25. output_path = dag_path + f'{item.name}_{item.id}.py'
  26. with open(output_path, "w") as fh:
  27. fh.write(dag_content)
  28. print(f'write dag to {output_path}')