job.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import os
  2. import stat
  3. from app.core.airflow.task import *
  4. from app.schemas import AirflowJob
  5. class AirflowJobSubmitter:
  6. @staticmethod
  7. def submit_dag(item: AirflowJob):
  8. m_compilers = {'python': PythonTaskCompiler,
  9. 'datax': DataXTaskCompiler,
  10. 'sparks': SparksTaskCompiler,
  11. 'java': JavaTaskCompiler}
  12. nodes = [m_compilers[task.task_type](item=task).translate(item.job_mode or 1) for task in item.tasks if
  13. task.task_type != 'sparks']
  14. spark_nodes = [SparksTaskCompiler(item=task).translate(item.job_mode or 1) for task in item.tasks if
  15. task.task_type == 'sparks']
  16. edges = []
  17. for edge in item.dependence:
  18. edges.append({"source_operator_name": f'op_{edge[0]}',
  19. "target_operator_name": f'op_{edge[1]}'})
  20. #
  21. # m_interval = {
  22. # "None": "None",
  23. # "@once": "@once",
  24. # "0 * * * *": "@hourly",
  25. # "0 0 * * *": "@daily",
  26. # "0 0 * * 0": "@weekly",
  27. # "0 0 1 * *": "@monthly",
  28. # "0 0 1 1 *": "@yearly"
  29. # }
  30. parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': f'dag_{item.id}',
  31. 'user_name': item.user_id, 'job_id': item.id, 'trigger_status': bool(item.trigger_status),
  32. 'interval': item.cron
  33. }
  34. env = Environment(
  35. loader=PackageLoader('app.core.airflow'),
  36. autoescape=select_autoescape()
  37. )
  38. template = env.get_template("dag_template.py.jinja2")
  39. dag_content = template.render(parameters)
  40. print(f'finish build:{dag_content}')
  41. dag_path = '/dags/'
  42. output_path = dag_path + f'dag_{item.id}.py'
  43. with open(output_path, "w") as fh:
  44. fh.write(dag_content)
  45. os.chmod(output_path, stat.S_IRWXO | stat.S_IRWXG | stat.S_IRWXU)
  46. print(f'write dag to {output_path}')